Skip to main content

simd_kernels/kernels/
window.rs

1// Copyright (c) 2025 SpaceCell Enterprises Ltd
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// Commercial licensing available. See LICENSE and LICENSING.md.
4
5//! # **Window Functions Kernels Module** - *High-Performance Analytical Window Operations*
6//!
7//! Advanced window function kernels for sliding window computations,
8//! ranking operations, and positional analytics with SIMD acceleration and null-aware semantics.
9//! Backbone of time series analysis, analytical SQL window functions, and chunked streaming computations.
10//!
11//! ## Core Operations
12//! - **Moving averages**: Rolling mean calculations with configurable window sizes
13//! - **Cumulative functions**: Running sums, products, and statistical aggregations  
14//! - **Ranking functions**: ROW_NUMBER, RANK, DENSE_RANK with tie-handling strategies
15//! - **Lead/lag operations**: Positional value access with configurable offsets
16//! - **Percentile functions**: Moving quantile calculations with interpolation support
17//! - **Window aggregates**: MIN, MAX, SUM operations over sliding windows
18
19include!(concat!(env!("OUT_DIR"), "/simd_lanes.rs"));
20
21use std::marker::PhantomData;
22
23use minarrow::{
24    Bitmask, BooleanAVT, BooleanArray, FloatArray, Integer, IntegerArray, Length, MaskedArray,
25    Offset, StringArray, Vec64,
26    aliases::{FloatAVT, IntegerAVT},
27    enums::error::KernelError,
28    vec64,
29};
30use num_traits::{Float, Num, NumCast, One, Zero};
31
32use minarrow::StringAVT;
33use minarrow::utils::confirm_mask_capacity;
34
35use crate::kernels::sort::total_cmp_f;
36
37// Helpers
38#[inline(always)]
39fn new_null_mask(len: usize) -> Bitmask {
40    Bitmask::new_set_all(len, false)
41}
42
43#[inline(always)]
44fn prealloc_vec<T: Copy>(len: usize) -> Vec64<T> {
45    let mut v = Vec64::<T>::with_capacity(len);
46    unsafe { v.set_len(len) };
47    v
48}
49
50// Rolling kernels (sum, product, min, max, mean, count)
51
52/// Zero-allocation variant that writes directly to caller-provided output buffers.
53///
54/// Generic sliding window aggregator for kernels that allow an
55/// incremental push and pop update (sum, product, etc.).
56/// Always emits the running aggregate, even when the subwindow has nulls.
57/// Only flags "valid" once the full subwindow has been seen.
58///
59/// Panics if `out.len() != data.len()` or `out_mask.capacity() < data.len()`.
60#[inline(always)]
61fn rolling_push_pop_to<T, FAdd, FRem>(
62    data: &[T],
63    mask: Option<&Bitmask>,
64    subwindow: usize,
65    mut add: FAdd,
66    mut remove: FRem,
67    zero: T,
68    out: &mut [T],
69    out_mask: &mut Bitmask,
70) where
71    T: Copy,
72    FAdd: FnMut(T, T) -> T,
73    FRem: FnMut(T, T) -> T,
74{
75    let n = data.len();
76    assert_eq!(
77        n,
78        out.len(),
79        "rolling_push_pop_to: input/output length mismatch"
80    );
81
82    if subwindow == 0 {
83        for slot in out.iter_mut() {
84            *slot = zero;
85        }
86        return;
87    }
88
89    let mut agg = zero;
90    let mut invalids = 0usize;
91    for i in 0..n {
92        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
93            agg = add(agg, data[i]);
94        } else {
95            invalids += 1;
96        }
97        if i + 1 > subwindow {
98            let j = i + 1 - subwindow - 1;
99            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
100                agg = remove(agg, data[j]);
101            } else {
102                invalids -= 1;
103            }
104        }
105        if i + 1 < subwindow {
106            unsafe { out_mask.set_unchecked(i, false) };
107            out[i] = zero;
108        } else {
109            let ok = invalids == 0;
110            unsafe { out_mask.set_unchecked(i, ok) };
111            out[i] = agg;
112        }
113    }
114}
115
116/// Allocating variant: creates new output buffers internally.
117#[inline(always)]
118pub fn rolling_push_pop<T, FAdd, FRem>(
119    data: &[T],
120    mask: Option<&Bitmask>,
121    subwindow: usize,
122    add: FAdd,
123    remove: FRem,
124    zero: T,
125) -> (Vec64<T>, Bitmask)
126where
127    T: Copy,
128    FAdd: FnMut(T, T) -> T,
129    FRem: FnMut(T, T) -> T,
130{
131    let n = data.len();
132    let mut out = prealloc_vec::<T>(n);
133    let mut out_mask = new_null_mask(n);
134    rolling_push_pop_to(
135        data,
136        mask,
137        subwindow,
138        add,
139        remove,
140        zero,
141        &mut out,
142        &mut out_mask,
143    );
144    (out, out_mask)
145}
146
147/// Zero-allocation variant that writes directly to caller-provided output buffers.
148///
149/// Generic rolling extreme aggregator (min/max) for a subwindow over a slice.
150///
151/// Panics if `out.len() != data.len()`.
152#[inline(always)]
153pub fn rolling_extreme_to<T, F>(
154    data: &[T],
155    mask: Option<&Bitmask>,
156    subwindow: usize,
157    mut better: F,
158    zero: T,
159    out: &mut [T],
160    out_mask: &mut Bitmask,
161) where
162    T: Copy,
163    F: FnMut(&T, &T) -> bool,
164{
165    let n = data.len();
166    assert_eq!(
167        n,
168        out.len(),
169        "rolling_extreme_to: input/output length mismatch"
170    );
171
172    if subwindow == 0 {
173        return;
174    }
175
176    for i in 0..n {
177        if i + 1 < subwindow {
178            unsafe { out_mask.set_unchecked(i, false) };
179            out[i] = zero;
180            continue;
181        }
182        let start = i + 1 - subwindow;
183        let mut found = false;
184        let mut extreme = zero;
185        for j in start..=i {
186            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
187                if !found {
188                    extreme = data[j];
189                    found = true;
190                } else if better(&data[j], &extreme) {
191                    extreme = data[j];
192                }
193            } else {
194                found = false;
195                break;
196            }
197        }
198        unsafe { out_mask.set_unchecked(i, found) };
199        out[i] = if found { extreme } else { zero };
200    }
201}
202
203/// Allocating variant: creates new output buffers internally.
204#[inline(always)]
205pub fn rolling_extreme<T, F>(
206    data: &[T],
207    mask: Option<&Bitmask>,
208    subwindow: usize,
209    better: F,
210    zero: T,
211) -> (Vec64<T>, Bitmask)
212where
213    T: Copy,
214    F: FnMut(&T, &T) -> bool,
215{
216    let n = data.len();
217    let mut out = prealloc_vec::<T>(n);
218    let mut out_mask = new_null_mask(n);
219    rolling_extreme_to(data, mask, subwindow, better, zero, &mut out, &mut out_mask);
220    (out, out_mask)
221}
222
223/// Zero-allocation variant that writes directly to caller-provided output buffers.
224///
225/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
226/// Panics if `out.len() != data.len()`.
227#[inline]
228pub fn rolling_sum_int_to<T: Num + Copy + Zero>(
229    data: &[T],
230    mask: Option<&Bitmask>,
231    subwindow: usize,
232    out: &mut [T],
233    out_mask: &mut Bitmask,
234) {
235    rolling_push_pop_to(
236        data,
237        mask,
238        subwindow,
239        |a, b| a + b,
240        |a, b| a - b,
241        T::zero(),
242        out,
243        out_mask,
244    );
245    if mask.is_some() && subwindow > 0 && subwindow - 1 < out.len() {
246        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
247        out[subwindow - 1] = T::zero();
248    }
249}
250
251/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
252///
253/// Applies a sliding window of configurable size to compute cumulative sums, employing
254/// incremental computation to avoid O(n²) complexity through efficient push-pop operations.
255/// Each position in the output represents the sum of values within the preceding window.
256///
257/// ## Parameters
258/// * `window` - Integer array view containing the data, offset, and length information
259/// * `subwindow` - Size of the sliding window (number of elements to sum)
260///
261/// ## Returns
262/// Returns an `IntegerArray<T>` containing:
263/// - Rolling sums for each position where a complete window exists
264/// - Zero values for positions before the window is complete
265/// - Null mask indicating validity (false for incomplete windows or null-contaminated windows)
266#[inline]
267pub fn rolling_sum_int<T: Num + Copy + Zero>(
268    window: IntegerAVT<'_, T>,
269    subwindow: usize,
270) -> IntegerArray<T> {
271    let (arr, offset, len) = window;
272    let data = &arr.data[offset..offset + len];
273    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
274    let mut out = prealloc_vec::<T>(len);
275    let mut out_mask = new_null_mask(len);
276    rolling_sum_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
277    IntegerArray {
278        data: out.into(),
279        null_mask: Some(out_mask),
280    }
281}
282
283/// Zero-allocation variant that writes directly to caller-provided output buffers.
284///
285/// Computes rolling sums over a sliding window for floating-point data.
286/// Panics if `out.len() != data.len()`.
287#[inline]
288pub fn rolling_sum_float_to<T: Float + Copy + Zero>(
289    data: &[T],
290    mask: Option<&Bitmask>,
291    subwindow: usize,
292    out: &mut [T],
293    out_mask: &mut Bitmask,
294) {
295    rolling_push_pop_to(
296        data,
297        mask,
298        subwindow,
299        |a, b| a + b,
300        |a, b| a - b,
301        T::zero(),
302        out,
303        out_mask,
304    );
305    if subwindow > 0 && subwindow - 1 < out.len() {
306        out_mask.set(subwindow - 1, false);
307        out[subwindow - 1] = T::zero();
308    }
309}
310
311/// Computes rolling sums over a sliding window for floating-point data with IEEE 754 compliance.
312///
313/// Applies incremental computation to calculate cumulative sums across sliding windows,
314/// maintaining numerical stability through careful accumulation strategies. Handles
315/// special floating-point values (infinity, NaN) according to IEEE 754 semantics.
316///
317/// ## Parameters
318/// * `window` - Float array view containing the data, offset, and length information
319/// * `subwindow` - Size of the sliding window for summation
320///
321/// ## Returns
322/// Returns a `FloatArray<T>` containing:
323/// - Rolling sums computed incrementally for efficiency
324/// - Zero values for positions with incomplete windows
325/// - Proper null mask for window validity tracking
326#[inline]
327pub fn rolling_sum_float<T: Float + Copy + Zero>(
328    window: FloatAVT<'_, T>,
329    subwindow: usize,
330) -> FloatArray<T> {
331    let (arr, offset, len) = window;
332    let data = &arr.data[offset..offset + len];
333    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
334    let mut out = prealloc_vec::<T>(len);
335    let mut out_mask = new_null_mask(len);
336    rolling_sum_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
337    FloatArray {
338        data: out.into(),
339        null_mask: Some(out_mask),
340    }
341}
342
343/// Zero-allocation variant that writes directly to caller-provided output buffers.
344///
345/// Computes rolling sums over i32 data (pre-converted from booleans).
346/// Panics if `out.len() != data.len()`.
347#[inline]
348pub fn rolling_sum_bool_to(
349    data: &[i32],
350    mask: Option<&Bitmask>,
351    subwindow: usize,
352    out: &mut [i32],
353    out_mask: &mut Bitmask,
354) {
355    rolling_push_pop_to(
356        data,
357        mask,
358        subwindow,
359        |a, b| a + b,
360        |a, b| a - b,
361        0,
362        out,
363        out_mask,
364    );
365    if subwindow > 0 && subwindow - 1 < out.len() {
366        out_mask.set(subwindow - 1, false);
367        out[subwindow - 1] = 0;
368    }
369}
370
371/// Computes rolling sums over boolean data, counting true values within sliding windows.
372///
373/// Treats boolean values as integers (true=1, false=0) and applies sliding window
374/// summation to count true occurrences within each window position. Essential for
375/// constructing conditional aggregations and boolean pattern analysis.
376///
377/// ## Parameters
378/// * `window` - Boolean array view with offset and length specifications
379/// * `subwindow` - Number of boolean values to consider in each sliding window
380///
381/// ## Returns
382/// Returns an `IntegerArray<i32>` containing:
383/// - Count of true values within each complete window
384/// - Zero for positions with incomplete windows
385/// - Null mask indicating window completeness and null contamination
386#[inline]
387pub fn rolling_sum_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> IntegerArray<i32> {
388    let (arr, offset, len) = window;
389    let bools: Vec<i32> = arr.iter_range(offset, len).map(|b| b as i32).collect();
390    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
391    let mut out = prealloc_vec::<i32>(len);
392    let mut out_mask = new_null_mask(len);
393    rolling_sum_bool_to(&bools, mask.as_ref(), subwindow, &mut out, &mut out_mask);
394    IntegerArray {
395        data: out.into(),
396        null_mask: Some(out_mask),
397    }
398}
399
400/// Zero-allocation variant that writes directly to caller-provided output buffers.
401#[inline]
402pub fn rolling_product_int_to<T: Num + Copy + One + Zero + PartialEq>(
403    data: &[T],
404    mask: Option<&Bitmask>,
405    subwindow: usize,
406    out: &mut [T],
407    out_mask: &mut Bitmask,
408) {
409    let n = data.len();
410    assert_eq!(
411        n,
412        out.len(),
413        "rolling_product_int_to: input/output length mismatch"
414    );
415
416    if subwindow == 0 {
417        for slot in out.iter_mut() {
418            *slot = T::zero();
419        }
420        return;
421    }
422
423    // Track the product of non-zero elements and a separate zero count
424    // to avoid integer division-by-zero when a zero leaves the window.
425    let mut nz_product = T::one();
426    let mut zero_count = 0usize;
427    let mut invalids = 0usize;
428
429    for i in 0..n {
430        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
431            if data[i] == T::zero() {
432                zero_count += 1;
433            } else {
434                nz_product = nz_product * data[i];
435            }
436        } else {
437            invalids += 1;
438        }
439
440        if i + 1 > subwindow {
441            let j = i + 1 - subwindow - 1;
442            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
443                if data[j] == T::zero() {
444                    zero_count -= 1;
445                } else {
446                    nz_product = nz_product / data[j];
447                }
448            } else {
449                invalids -= 1;
450            }
451        }
452
453        if i + 1 < subwindow {
454            unsafe { out_mask.set_unchecked(i, false) };
455            out[i] = T::zero();
456        } else {
457            let ok = invalids == 0;
458            unsafe { out_mask.set_unchecked(i, ok) };
459            out[i] = if zero_count > 0 {
460                T::zero()
461            } else {
462                nz_product
463            };
464        }
465    }
466}
467
468/// Computes rolling products over a sliding window for integer data with overflow protection.
469///
470/// Applies multiplicative aggregation across sliding windows using incremental computation
471/// through division operations. Maintains numerical stability through careful handling of
472/// zero values and potential overflow conditions in integer arithmetic.
473#[inline]
474pub fn rolling_product_int<T: Num + Copy + One + Zero>(
475    window: IntegerAVT<'_, T>,
476    subwindow: usize,
477) -> IntegerArray<T> {
478    let (arr, offset, len) = window;
479    let data = &arr.data[offset..offset + len];
480    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
481    let mut out = prealloc_vec::<T>(len);
482    let mut out_mask = new_null_mask(len);
483    rolling_product_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
484    IntegerArray {
485        data: out.into(),
486        null_mask: Some(out_mask),
487    }
488}
489
490/// Zero-allocation variant that writes directly to caller-provided output buffers.
491#[inline]
492pub fn rolling_product_float_to<T: Float + Copy + One + Zero>(
493    data: &[T],
494    mask: Option<&Bitmask>,
495    subwindow: usize,
496    out: &mut [T],
497    out_mask: &mut Bitmask,
498) {
499    rolling_push_pop_to(
500        data,
501        mask,
502        subwindow,
503        |a, b| a * b,
504        |a, b| a / b,
505        T::one(),
506        out,
507        out_mask,
508    );
509}
510
511/// Computes rolling products over floating-point data with IEEE 754 mathematical semantics.
512///
513/// Performs multiplicative aggregation using incremental computation strategies that
514/// maintain numerical precision through careful handling of special values (infinity,
515/// NaN, zero) according to IEEE 754 standards.
516#[inline]
517pub fn rolling_product_float<T: Float + Copy + One + Zero>(
518    window: FloatAVT<'_, T>,
519    subwindow: usize,
520) -> FloatArray<T> {
521    let (arr, offset, len) = window;
522    let data = &arr.data[offset..offset + len];
523    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
524    let mut out = prealloc_vec::<T>(len);
525    let mut out_mask = new_null_mask(len);
526    rolling_product_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
527    FloatArray {
528        data: out.into(),
529        null_mask: Some(out_mask),
530    }
531}
532
533/// Zero-allocation variant that writes directly to caller-provided output buffers.
534///
535/// Computes rolling logical AND over boolean data (pre-converted to i32: 1=true, 0=false).
536#[inline]
537pub fn rolling_product_bool_to(
538    data: &[i32],
539    mask: Option<&Bitmask>,
540    subwindow: usize,
541    out: &mut Bitmask,
542    out_mask: &mut Bitmask,
543) {
544    let n = data.len();
545    for i in 0..n {
546        let start = if i + 1 >= subwindow {
547            i + 1 - subwindow
548        } else {
549            0
550        };
551        let mut acc = true;
552        let mut valid = subwindow > 0 && i + 1 >= subwindow;
553        for j in start..=i {
554            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
555            if is_valid {
556                acc &= data[j] != 0;
557            } else {
558                valid = false;
559                break;
560            }
561        }
562        unsafe { out_mask.set_unchecked(i, valid) };
563        out.set(i, valid && acc);
564    }
565}
566
567/// Computes rolling logical AND operations over boolean data within sliding windows.
568///
569/// Treats boolean multiplication as logical AND operations, computing the conjunction
570/// of all boolean values within each sliding window. Essential for constructing
571/// compound logical conditions and boolean pattern validation.
572#[inline]
573pub fn rolling_product_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> BooleanArray<()> {
574    let (arr, offset, len) = window;
575    let n = len;
576    let mut out_mask = new_null_mask(n);
577    let mut out = Bitmask::new_set_all(n, false);
578
579    for i in 0..n {
580        let start = if i + 1 >= subwindow {
581            i + 1 - subwindow
582        } else {
583            0
584        };
585        let mut acc = true;
586        let mut valid = subwindow > 0 && i + 1 >= subwindow;
587        for j in start..=i {
588            match unsafe { arr.get_unchecked(offset + j) } {
589                Some(val) => acc &= val,
590                None => {
591                    valid = false;
592                    break;
593                }
594            }
595        }
596        unsafe { out_mask.set_unchecked(i, valid) };
597        out.set(i, valid && acc);
598    }
599
600    BooleanArray {
601        data: out.into(),
602        null_mask: Some(out_mask),
603        len: n,
604        _phantom: PhantomData,
605    }
606}
607
608/// Zero-allocation variant that writes directly to caller-provided output buffers.
609#[inline]
610pub fn rolling_mean_int_to<T: NumCast + Copy + Zero>(
611    data: &[T],
612    mask: Option<&Bitmask>,
613    subwindow: usize,
614    out: &mut [f64],
615    out_mask: &mut Bitmask,
616) {
617    let n = data.len();
618    if subwindow == 0 {
619        return;
620    }
621    for i in 0..n {
622        if i + 1 < subwindow {
623            unsafe { out_mask.set_unchecked(i, false) };
624            out[i] = 0.0;
625            continue;
626        }
627        let start = i + 1 - subwindow;
628        let mut sum = 0.0;
629        let mut valid = true;
630        for j in start..=i {
631            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
632                sum += num_traits::cast(data[j]).unwrap_or(0.0);
633            } else {
634                valid = false;
635                break;
636            }
637        }
638        unsafe { out_mask.set_unchecked(i, valid) };
639        out[i] = if valid { sum / subwindow as f64 } else { 0.0 };
640    }
641    if subwindow > 0 && subwindow - 1 < out.len() {
642        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
643        out[subwindow - 1] = 0.0;
644    }
645}
646
647/// Computes rolling arithmetic means over integer data with high-precision floating-point output.
648#[inline]
649pub fn rolling_mean_int<T: NumCast + Copy + Zero>(
650    window: IntegerAVT<'_, T>,
651    subwindow: usize,
652) -> FloatArray<f64> {
653    let (arr, offset, len) = window;
654    let data = &arr.data[offset..offset + len];
655    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
656    let mut out = prealloc_vec::<f64>(len);
657    let mut out_mask = new_null_mask(len);
658    rolling_mean_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
659    FloatArray {
660        data: out.into(),
661        null_mask: Some(out_mask),
662    }
663}
664
665/// Zero-allocation variant that writes directly to caller-provided output buffers.
666#[inline]
667pub fn rolling_mean_float_to<T: Float + Copy + Zero>(
668    data: &[T],
669    mask: Option<&Bitmask>,
670    subwindow: usize,
671    out: &mut [T],
672    out_mask: &mut Bitmask,
673) {
674    let n = data.len();
675    if subwindow == 0 {
676        return;
677    }
678    for i in 0..n {
679        if i + 1 < subwindow {
680            unsafe { out_mask.set_unchecked(i, false) };
681            out[i] = T::zero();
682            continue;
683        }
684        let start = i + 1 - subwindow;
685        let mut sum = T::zero();
686        let mut valid = true;
687        for j in start..=i {
688            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
689                sum = sum + data[j];
690            } else {
691                valid = false;
692                break;
693            }
694        }
695        unsafe { out_mask.set_unchecked(i, valid) };
696        out[i] = if valid {
697            sum / T::from(subwindow as u32).unwrap()
698        } else {
699            T::zero()
700        };
701    }
702    if subwindow > 0 && subwindow - 1 < out.len() {
703        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
704        out[subwindow - 1] = T::zero();
705    }
706}
707
708/// Computes rolling arithmetic means over floating-point data with IEEE 754 compliance.
709#[inline]
710pub fn rolling_mean_float<T: Float + Copy + Zero>(
711    window: FloatAVT<'_, T>,
712    subwindow: usize,
713) -> FloatArray<T> {
714    let (arr, offset, len) = window;
715    let data = &arr.data[offset..offset + len];
716    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
717    let mut out = prealloc_vec::<T>(len);
718    let mut out_mask = new_null_mask(len);
719    rolling_mean_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
720    FloatArray {
721        data: out.into(),
722        null_mask: Some(out_mask),
723    }
724}
725
726/// Zero-allocation variant that writes directly to caller-provided output buffers.
727#[inline]
728pub fn rolling_min_int_to<T: Ord + Copy + Zero>(
729    data: &[T],
730    mask: Option<&Bitmask>,
731    subwindow: usize,
732    out: &mut [T],
733    out_mask: &mut Bitmask,
734) {
735    rolling_extreme_to(
736        data,
737        mask,
738        subwindow,
739        |a, b| a < b,
740        T::zero(),
741        out,
742        out_mask,
743    );
744    if subwindow > 0 && subwindow - 1 < out.len() {
745        out_mask.set(subwindow - 1, false);
746        out[subwindow - 1] = T::zero();
747    }
748}
749
750/// Computes rolling minimum values over integer data within sliding windows.
751#[inline]
752pub fn rolling_min_int<T: Ord + Copy + Zero>(
753    window: IntegerAVT<'_, T>,
754    subwindow: usize,
755) -> IntegerArray<T> {
756    let (arr, offset, len) = window;
757    let data = &arr.data[offset..offset + len];
758    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
759    let mut out = prealloc_vec::<T>(len);
760    let mut out_mask = new_null_mask(len);
761    rolling_min_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
762    IntegerArray {
763        data: out.into(),
764        null_mask: Some(out_mask),
765    }
766}
767
768/// Zero-allocation variant that writes directly to caller-provided output buffers.
769#[inline]
770pub fn rolling_max_int_to<T: Ord + Copy + Zero>(
771    data: &[T],
772    mask: Option<&Bitmask>,
773    subwindow: usize,
774    out: &mut [T],
775    out_mask: &mut Bitmask,
776) {
777    rolling_extreme_to(
778        data,
779        mask,
780        subwindow,
781        |a, b| a > b,
782        T::zero(),
783        out,
784        out_mask,
785    );
786}
787
788/// Computes rolling maximum values over integer data within sliding windows.
789#[inline]
790pub fn rolling_max_int<T: Ord + Copy + Zero>(
791    window: IntegerAVT<'_, T>,
792    subwindow: usize,
793) -> IntegerArray<T> {
794    let (arr, offset, len) = window;
795    let data = &arr.data[offset..offset + len];
796    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
797    let mut out = prealloc_vec::<T>(len);
798    let mut out_mask = new_null_mask(len);
799    rolling_max_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
800    IntegerArray {
801        data: out.into(),
802        null_mask: Some(out_mask),
803    }
804}
805
806/// Zero-allocation variant that writes directly to caller-provided output buffers.
807#[inline]
808pub fn rolling_min_float_to<T: Float + Copy + Zero>(
809    data: &[T],
810    mask: Option<&Bitmask>,
811    subwindow: usize,
812    out: &mut [T],
813    out_mask: &mut Bitmask,
814) {
815    rolling_extreme_to(
816        data,
817        mask,
818        subwindow,
819        |a, b| a < b,
820        T::zero(),
821        out,
822        out_mask,
823    );
824    if subwindow > 0 && subwindow - 1 < out.len() {
825        out_mask.set(subwindow - 1, false);
826        out[subwindow - 1] = T::zero();
827    }
828}
829
830/// Computes rolling minimum values over floating-point data with IEEE 754 compliance.
831#[inline]
832pub fn rolling_min_float<T: Float + Copy + Zero>(
833    window: FloatAVT<'_, T>,
834    subwindow: usize,
835) -> FloatArray<T> {
836    let (arr, offset, len) = window;
837    let data = &arr.data[offset..offset + len];
838    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
839    let mut out = prealloc_vec::<T>(len);
840    let mut out_mask = new_null_mask(len);
841    rolling_min_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
842    FloatArray {
843        data: out.into(),
844        null_mask: Some(out_mask),
845    }
846}
847
848/// Zero-allocation variant that writes directly to caller-provided output buffers.
849#[inline]
850pub fn rolling_max_float_to<T: Float + Copy + Zero>(
851    data: &[T],
852    mask: Option<&Bitmask>,
853    subwindow: usize,
854    out: &mut [T],
855    out_mask: &mut Bitmask,
856) {
857    rolling_extreme_to(
858        data,
859        mask,
860        subwindow,
861        |a, b| a > b,
862        T::zero(),
863        out,
864        out_mask,
865    );
866}
867
868/// Computes rolling maximum values over floating-point data with IEEE 754 compliance.
869#[inline]
870pub fn rolling_max_float<T: Float + Copy + Zero>(
871    window: FloatAVT<'_, T>,
872    subwindow: usize,
873) -> FloatArray<T> {
874    let (arr, offset, len) = window;
875    let data = &arr.data[offset..offset + len];
876    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
877    let mut out = prealloc_vec::<T>(len);
878    let mut out_mask = new_null_mask(len);
879    rolling_max_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
880    FloatArray {
881        data: out.into(),
882        null_mask: Some(out_mask),
883    }
884}
885
886/// Computes rolling counts of elements within sliding windows for positional analysis.
887///
888/// Counts the number of elements present within each sliding window position, providing
889/// fundamental cardinality information essential for statistical analysis and data validation.
890/// Unlike other rolling functions, operates on position information rather than data values.
891///
892/// ## Parameters
893/// * `window` - Tuple containing offset and length defining the data window scope
894/// * `subwindow` - Size of sliding window for element counting
895///
896/// ## Returns
897/// Returns an `IntegerArray<i32>` containing:
898/// - Count of elements in each complete window (always equals subwindow size)
899/// - Zero values for positions with incomplete windows
900/// - Null mask indicating where complete windows exist
901///
902/// ## Examples
903/// ```rust,ignore
904/// use simd_kernels::kernels::window::rolling_count;
905///
906/// let result = rolling_count((0, 5), 3); // 5 elements, window size 3
907/// ```
908#[inline]
909/// Zero-allocation variant that writes directly to caller-provided output buffers.
910#[inline]
911pub fn rolling_count_to(len: usize, subwindow: usize, out: &mut [i32], out_mask: &mut Bitmask) {
912    for i in 0..len {
913        let start = if i + 1 >= subwindow {
914            i + 1 - subwindow
915        } else {
916            0
917        };
918        let count = (i - start + 1) as i32;
919        let valid_row = subwindow > 0 && i + 1 >= subwindow;
920        unsafe { out_mask.set_unchecked(i, valid_row) };
921        out[i] = if valid_row { count } else { 0 };
922    }
923}
924
925pub fn rolling_count(window: (Offset, Length), subwindow: usize) -> IntegerArray<i32> {
926    let (_offset, len) = window;
927    let mut out = prealloc_vec::<i32>(len);
928    let mut out_mask = new_null_mask(len);
929    for i in 0..len {
930        let start = if i + 1 >= subwindow {
931            i + 1 - subwindow
932        } else {
933            0
934        };
935        let count = (i - start + 1) as i32;
936        let valid_row = subwindow > 0 && i + 1 >= subwindow;
937        unsafe { out_mask.set_unchecked(i, valid_row) };
938        out[i] = if valid_row { count } else { 0 };
939    }
940    IntegerArray {
941        data: out.into(),
942        null_mask: Some(out_mask),
943    }
944}
945
946// Rank and Dense-rank kernels
947
948#[inline(always)]
949fn rank_numeric_to<T, F>(
950    data: &[T],
951    mask: Option<&Bitmask>,
952    mut cmp: F,
953    out: &mut [i32],
954    out_mask: &mut Bitmask,
955) where
956    T: Copy,
957    F: FnMut(&T, &T) -> std::cmp::Ordering,
958{
959    let n = data.len();
960    let mut indices: Vec<usize> = (0..n).collect();
961    indices.sort_by(|&i, &j| cmp(&data[i], &data[j]));
962
963    for (rank, &i) in indices.iter().enumerate() {
964        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
965            out[i] = (rank + 1) as i32;
966            unsafe { out_mask.set_unchecked(i, true) };
967        }
968    }
969}
970
971#[inline(always)]
972fn rank_numeric<T, F>(data: &[T], mask: Option<&Bitmask>, cmp: F) -> IntegerArray<i32>
973where
974    T: Copy,
975    F: FnMut(&T, &T) -> std::cmp::Ordering,
976{
977    let n = data.len();
978    let mut out = vec64![0i32; n];
979    let mut out_mask = Bitmask::new_set_all(n, false);
980    rank_numeric_to(data, mask, cmp, &mut out, &mut out_mask);
981    IntegerArray {
982        data: out.into(),
983        null_mask: Some(out_mask),
984    }
985}
986
987/// Computes standard SQL ROW_NUMBER() ranking for integer data with tie handling.
988///
989/// Assigns sequential rank values to elements based on their sorted order, providing
990/// standard SQL ROW_NUMBER() semantics where tied values receive different ranks.
991/// Essential for analytical queries requiring unique positional ranking.
992///
993/// ## Parameters
994/// * `window` - Integer array view containing values for ranking
995///
996/// ## Returns
997/// Returns an `IntegerArray<i32>` containing:
998/// - Rank values from 1 to n for valid elements
999/// - Zero values for null elements
1000/// - Null mask indicating which positions have valid ranks
1001///
1002/// ## Ranking Semantics
1003/// - **ROW_NUMBER() behaviour**: Each element receives a unique rank (1, 2, 3, ...)
1004/// - **Tie breaking**: Tied values receive different ranks based on their position
1005/// - **Ascending order**: Smaller values receive lower (better) ranks
1006/// - **Null exclusion**: Null values are excluded from ranking and receive rank 0
1007///
1008/// ## Use Cases
1009/// - **Analytical queries**: SQL ROW_NUMBER() window function implementation
1010/// - **Leaderboards**: Creating ordered rankings with unique positions
1011/// - **Percentile calculation**: Basis for percentile and quartile computations
1012/// - **Data analysis**: Establishing ordinality in integer datasets
1013///
1014/// ## Examples
1015/// ```rust,ignore
1016/// use minarrow::IntegerArray;
1017/// use simd_kernels::kernels::window::rank_int;
1018///
1019/// let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20, 10]);
1020/// let result = rank_int((&arr, 0, arr.len()));
1021/// // Output: [4, 1, 3, 2] - ROW_NUMBER() style ranking
1022/// ```
1023/// Zero-allocation variant that writes directly to caller-provided output buffers.
1024#[inline(always)]
1025pub fn rank_int_to<T: Ord + Copy>(
1026    data: &[T],
1027    mask: Option<&Bitmask>,
1028    out: &mut [i32],
1029    out_mask: &mut Bitmask,
1030) {
1031    rank_numeric_to(data, mask, T::cmp, out, out_mask);
1032}
1033
1034/// Zero-allocation variant that writes directly to caller-provided output buffers.
1035#[inline(always)]
1036pub fn rank_float_to<T: Float + Copy>(
1037    data: &[T],
1038    mask: Option<&Bitmask>,
1039    out: &mut [i32],
1040    out_mask: &mut Bitmask,
1041) {
1042    rank_numeric_to(data, mask, total_cmp_f, out, out_mask);
1043}
1044
1045#[inline(always)]
1046pub fn rank_int<T: Ord + Copy>(window: IntegerAVT<T>) -> IntegerArray<i32> {
1047    let (arr, offset, len) = window;
1048    let data = &arr.data[offset..offset + len];
1049    let null_mask = if len != arr.data.len() {
1050        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1051    } else {
1052        &arr.null_mask
1053    };
1054    rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b))
1055}
1056
1057/// Computes standard SQL ROW_NUMBER() ranking for floating-point data with IEEE 754 compliance.
1058///
1059/// Assigns sequential rank values based on sorted floating-point order, implementing
1060/// ROW_NUMBER() semantics with proper handling of special floating-point values (NaN,
1061/// infinity) according to IEEE 754 comparison standards.
1062///
1063/// ## Parameters
1064/// * `window` - Float array view containing values for ranking
1065///
1066/// ## Returns
1067/// Returns an `IntegerArray<i32>` containing:
1068/// - Rank values from 1 to n for valid, non-NaN elements
1069/// - Zero values for null or NaN elements
1070/// - Null mask indicating positions with valid ranks
1071///
1072/// ## Floating-Point Ranking
1073/// - **IEEE 754 ordering**: Uses IEEE 754 compliant comparison operations
1074/// - **NaN handling**: NaN values are excluded from ranking (receive rank 0)
1075/// - **Infinity treatment**: Positive/negative infinity participate in ranking
1076/// - **Precision preservation**: Maintains full floating-point comparison precision
1077///
1078/// ## Ranking Semantics
1079/// - **ROW_NUMBER() style**: Each non-NaN element receives unique sequential rank
1080/// - **Ascending order**: Smaller floating-point values receive lower ranks
1081/// - **Tie breaking**: Floating-point ties broken by original array position
1082/// - **Special value exclusion**: NaN and null values excluded from rank assignment
1083///
1084/// ## Applications
1085/// - **Statistical ranking**: Ranking continuous numerical data
1086/// - **Scientific analysis**: Ordered ranking of experimental measurements
1087/// - **Financial analysis**: Ranking performance metrics and indicators
1088/// - **Data preprocessing**: Establishing ordinality for regression analysis
1089///
1090/// ## Examples
1091/// ```rust,ignore
1092/// use minarrow::FloatArray;
1093/// use simd_kernels::kernels::window::rank_float;
1094///
1095/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, f64::NAN]);
1096/// let result = rank_float((&arr, 0, arr.len()));
1097/// // Output: [3, 2, 1, 0] - NaN excluded, others ranked by value
1098/// ```
1099#[inline(always)]
1100pub fn rank_float<T: Float + Copy>(window: FloatAVT<T>) -> IntegerArray<i32> {
1101    let (arr, offset, len) = window;
1102    let data = &arr.data[offset..offset + len];
1103    let null_mask = if len != arr.data.len() {
1104        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1105    } else {
1106        &arr.null_mask
1107    };
1108    rank_numeric(data, null_mask.as_ref(), total_cmp_f)
1109}
1110
1111/// Computes standard SQL ROW_NUMBER() ranking for string data with lexicographic ordering.
1112///
1113/// Assigns sequential rank values based on lexicographic string comparison, implementing
1114/// ROW_NUMBER() semantics for textual data. Essential for alphabetical ranking and
1115/// string-based analytical operations.
1116///
1117/// ## Parameters
1118/// * `arr` - String array view containing textual values for ranking
1119///
1120/// ## Returns
1121/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1122/// - **Success**: Rank values from 1 to n for valid string elements
1123/// - **Error**: KernelError if capacity validation fails
1124/// - Zero values for null string elements
1125/// - Null mask indicating positions with valid ranks
1126///
1127/// ## String Ranking Semantics
1128/// - **Lexicographic order**: Uses standard string comparison (dictionary order)
1129/// - **Case sensitivity**: Comparisons are case-sensitive ("A" < "a")
1130/// - **Unicode support**: Proper handling of UTF-8 encoded string data
1131/// - **ROW_NUMBER() behaviour**: Tied strings receive different ranks by position
1132///
1133/// ## Error Conditions
1134/// - **Capacity errors**: Returns KernelError if mask capacity validation fails
1135/// - **Memory allocation**: May fail with insufficient memory for large datasets
1136///
1137/// ## Use Cases
1138/// - **Alphabetical ranking**: Creating alphabetically ordered rankings
1139/// - **Text analysis**: Establishing lexicographic ordinality in textual data
1140/// - **Database operations**: SQL ROW_NUMBER() implementation for string columns
1141/// - **Sorting applications**: Providing ranking information for string sorting
1142///
1143/// ## Examples
1144/// ```rust,ignore
1145/// use minarrow::StringArray;
1146/// use simd_kernels::kernels::window::rank_str;
1147///
1148/// let arr = StringArray::<u32>::from_slice(&["zebra", "apple", "banana"]);
1149/// let result = rank_str((&arr, 0, arr.len())).unwrap();
1150/// // Output: [3, 1, 2] - lexicographic ranking
1151/// ```
1152#[inline(always)]
1153pub fn rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1154    let (array, offset, len) = arr;
1155    let mask = array.null_mask.as_ref();
1156    let _ = confirm_mask_capacity(array.len(), mask)?;
1157
1158    // Gather (local_idx, string) pairs for valid elements in the window
1159    let mut tuples: Vec<(usize, &str)> = (0..len)
1160        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1161        .map(|i| (i, unsafe { array.get_unchecked(offset + i) }.unwrap_or("")))
1162        .collect();
1163
1164    // Sort by string value
1165    tuples.sort_by(|a, b| a.1.cmp(&b.1));
1166
1167    let mut out = vec64![0i32; len];
1168    let mut out_mask = new_null_mask(len);
1169
1170    // Assign ranks (1-based), using local output indices
1171    for (rank, (i, _)) in tuples.iter().enumerate() {
1172        out[*i] = (rank + 1) as i32;
1173        unsafe { out_mask.set_unchecked(*i, true) };
1174    }
1175
1176    Ok(IntegerArray {
1177        data: out.into(),
1178        null_mask: Some(out_mask),
1179    })
1180}
1181
1182#[inline(always)]
1183fn dense_rank_numeric_to<T, F, G>(
1184    data: &[T],
1185    mask: Option<&Bitmask>,
1186    mut sort: F,
1187    mut eq: G,
1188    out: &mut [i32],
1189    out_mask: &mut Bitmask,
1190) -> Result<(), KernelError>
1191where
1192    T: Copy,
1193    F: FnMut(&T, &T) -> std::cmp::Ordering,
1194    G: FnMut(&T, &T) -> bool,
1195{
1196    let n = data.len();
1197    let _ = confirm_mask_capacity(n, mask)?;
1198    let mut uniqs: Vec<T> = (0..n)
1199        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(i) }))
1200        .map(|i| data[i])
1201        .collect();
1202
1203    uniqs.sort_by(&mut sort);
1204    uniqs.dedup_by(|a, b| eq(&*a, &*b));
1205
1206    for i in 0..n {
1207        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
1208            let rank = uniqs.binary_search_by(|x| sort(x, &data[i])).unwrap() + 1;
1209            out[i] = rank as i32;
1210            unsafe { out_mask.set_unchecked(i, true) };
1211        } else {
1212            out[i] = 0;
1213        }
1214    }
1215
1216    Ok(())
1217}
1218
1219#[inline(always)]
1220fn dense_rank_numeric<T, F, G>(
1221    data: &[T],
1222    mask: Option<&Bitmask>,
1223    sort: F,
1224    eq: G,
1225) -> Result<IntegerArray<i32>, KernelError>
1226where
1227    T: Copy,
1228    F: FnMut(&T, &T) -> std::cmp::Ordering,
1229    G: FnMut(&T, &T) -> bool,
1230{
1231    let n = data.len();
1232    let mut out = prealloc_vec::<i32>(n);
1233    let mut out_mask = Bitmask::new_set_all(n, false);
1234    dense_rank_numeric_to(data, mask, sort, eq, &mut out, &mut out_mask)?;
1235    Ok(IntegerArray {
1236        data: out.into(),
1237        null_mask: Some(out_mask),
1238    })
1239}
1240
1241/// Computes SQL DENSE_RANK() ranking for integer data with consecutive rank assignment.
1242///
1243/// Assigns consecutive rank values where tied values receive identical ranks, implementing
1244/// SQL DENSE_RANK() semantics. Unlike standard ranking, eliminates gaps in rank sequence
1245/// when ties occur, providing compact rank numbering for analytical applications.
1246///
1247/// ## Parameters
1248/// * `window` - Integer array view containing values for dense ranking
1249///
1250/// ## Returns
1251/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1252/// - **Success**: Dense rank values with no gaps in sequence
1253/// - **Error**: KernelError if capacity validation fails
1254/// - Zero values for null elements
1255/// - Null mask indicating positions with valid ranks
1256///
1257/// ## Dense Ranking Semantics
1258/// - **DENSE_RANK() behaviour**: Tied values receive same rank, next rank is consecutive
1259/// - **No rank gaps**: Eliminates gaps that occur in standard RANK() function
1260/// - **Unique value counting**: Essentially counts distinct values in sorted order
1261/// - **Ascending order**: Smaller integer values receive lower (better) ranks
1262///
1263/// ## Comparison with RANK()
1264/// - **RANK()**: [1, 2, 2, 4] for values [10, 20, 20, 30]
1265/// - **DENSE_RANK()**: [1, 2, 2, 3] for values [10, 20, 20, 30]
1266///
1267/// ## Use Cases
1268/// - **Analytical queries**: SQL DENSE_RANK() window function implementation
1269/// - **Categorical ranking**: Creating compact categorical orderings
1270/// - **Percentile calculation**: Foundation for percentile computations without gaps
1271/// - **Data binning**: Assigning data points to consecutive bins based on value
1272///
1273/// ## Examples
1274/// ```rust,ignore
1275/// use minarrow::IntegerArray;
1276/// use simd_kernels::kernels::window::dense_rank_int;
1277///
1278/// let arr = IntegerArray::<i32>::from_slice(&[10, 30, 20, 30]);
1279/// let result = dense_rank_int((&arr, 0, arr.len())).unwrap();
1280/// // Output: [1, 3, 2, 3] - dense ranking with tied values
1281/// ```
1282#[inline(always)]
1283/// Zero-allocation variant that writes directly to caller-provided output buffers.
1284#[inline(always)]
1285pub fn dense_rank_int_to<T: Ord + Copy>(
1286    data: &[T],
1287    mask: Option<&Bitmask>,
1288    out: &mut [i32],
1289    out_mask: &mut Bitmask,
1290) -> Result<(), KernelError> {
1291    dense_rank_numeric_to(data, mask, T::cmp, |a, b| a == b, out, out_mask)
1292}
1293
1294/// Zero-allocation variant that writes directly to caller-provided output buffers.
1295#[inline(always)]
1296pub fn dense_rank_float_to<T: Float + Copy>(
1297    data: &[T],
1298    mask: Option<&Bitmask>,
1299    out: &mut [i32],
1300    out_mask: &mut Bitmask,
1301) -> Result<(), KernelError> {
1302    dense_rank_numeric_to(data, mask, total_cmp_f, |a, b| a == b, out, out_mask)
1303}
1304
1305pub fn dense_rank_int<T: Ord + Copy>(
1306    window: IntegerAVT<T>,
1307) -> Result<IntegerArray<i32>, KernelError> {
1308    let (arr, offset, len) = window;
1309    let data = &arr.data[offset..offset + len];
1310    let null_mask = if len != arr.data.len() {
1311        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1312    } else {
1313        &arr.null_mask
1314    };
1315    dense_rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b), |a, b| a == b)
1316}
1317
1318/// Computes SQL DENSE_RANK() ranking for floating-point data with IEEE 754 compliance.
1319///
1320/// Implements dense ranking for floating-point values where tied values receive identical
1321/// consecutive ranks. Handles special floating-point values (NaN, infinity) according
1322/// to IEEE 754 standards while maintaining dense rank sequence properties.
1323///
1324/// ## Parameters
1325/// * `window` - Float array view containing values for dense ranking
1326///
1327/// ## Returns
1328/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1329/// - **Success**: Dense rank values with consecutive numbering
1330/// - **Error**: KernelError if capacity validation fails
1331/// - Zero values for null or NaN elements
1332/// - Null mask indicating positions with valid ranks
1333///
1334/// ## Applications
1335/// - **Scientific ranking**: Dense ranking of experimental measurements
1336/// - **Statistical analysis**: Percentile calculations without rank gaps
1337/// - **Financial modeling**: Dense ranking of performance metrics
1338/// - **Data preprocessing**: Creating ordinal encodings for continuous variables
1339///
1340/// ## Examples
1341/// ```rust,ignore
1342/// use minarrow::FloatArray;
1343/// use simd_kernels::kernels::window::dense_rank_float;
1344///
1345/// let arr = FloatArray::<f64>::from_slice(&[1.5, 3.14, 2.71, 3.14]);
1346/// let result = dense_rank_float((&arr, 0, arr.len())).unwrap();
1347/// // Output: [1, 3, 2, 3] - dense ranking with tied 3.14 values
1348/// ```
1349#[inline(always)]
1350pub fn dense_rank_float<T: Float + Copy>(
1351    window: FloatAVT<T>,
1352) -> Result<IntegerArray<i32>, KernelError> {
1353    let (arr, offset, len) = window;
1354    let data = &arr.data[offset..offset + len];
1355    let null_mask = if len != arr.data.len() {
1356        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1357    } else {
1358        &arr.null_mask
1359    };
1360    dense_rank_numeric(data, null_mask.as_ref(), total_cmp_f, |a, b| a == b)
1361}
1362
1363/// Computes SQL DENSE_RANK() ranking for string data with lexicographic dense ordering.
1364///
1365/// Implements dense ranking for string values using lexicographic comparison, where
1366/// identical strings receive the same rank and subsequent ranks remain consecutive.
1367/// Essential for alphabetical dense ranking and textual categorical analysis.
1368///
1369/// ## Parameters
1370/// * `arr` - String array view containing textual values for dense ranking
1371///
1372/// ## Returns
1373/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1374/// - **Success**: Dense rank values with consecutive sequence
1375/// - **Error**: KernelError if capacity validation fails
1376/// - Zero values for null string elements
1377/// - Null mask indicating positions with valid ranks
1378///
1379/// ## Dense String Ranking
1380/// - **DENSE_RANK() semantics**: Identical strings receive same rank, no rank gaps
1381/// - **Lexicographic ordering**: Standard dictionary-style string comparison
1382/// - **Case sensitivity**: Maintains case-sensitive comparison ("Apple" ≠ "apple")
1383/// - **UTF-8 support**: Proper handling of Unicode string sequences
1384///
1385/// ## Use Cases
1386/// - **Alphabetical dense ranking**: Creating compact alphabetical orderings
1387/// - **Categorical encoding**: Converting string categories to dense integer codes
1388/// - **Text analytics**: Establishing lexicographic ordinality for text processing
1389/// - **Database operations**: SQL DENSE_RANK() for string-valued columns
1390///
1391/// ## Examples
1392/// ```rust,ignore
1393/// use minarrow::StringArray;
1394/// use simd_kernels::kernels::window::dense_rank_str;
1395///
1396/// let arr = StringArray::<u32>::from_slice(&["banana", "apple", "cherry", "apple"]);
1397/// let result = dense_rank_str((&arr, 0, arr.len())).unwrap();
1398/// // Output: [2, 1, 3, 1] - dense ranking with tied "apple" values
1399/// ```
1400#[inline(always)]
1401pub fn dense_rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1402    let (array, offset, len) = arr;
1403    let mask = array.null_mask.as_ref();
1404    let _ = confirm_mask_capacity(array.len(), mask)?;
1405
1406    // Collect all unique valid values in window
1407    let mut vals: Vec<&str> = (0..len)
1408        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1409        .map(|i| unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1410        .collect();
1411    vals.sort();
1412    vals.dedup();
1413
1414    let mut out = prealloc_vec::<i32>(len);
1415    let mut out_mask = Bitmask::new_set_all(len, false);
1416
1417    for i in 0..len {
1418        let valid = mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) });
1419        if valid {
1420            let rank = vals
1421                .binary_search(&unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1422                .unwrap()
1423                + 1;
1424            out[i] = rank as i32;
1425            unsafe { out_mask.set_unchecked(i, true) };
1426        } else {
1427            out[i] = 0;
1428        }
1429    }
1430
1431    Ok(IntegerArray {
1432        data: out.into(),
1433        null_mask: Some(out_mask),
1434    })
1435}
1436
1437// Lag / Lead / Shift kernels
1438
1439/// Zero-allocation variant that writes directly to caller-provided output buffers.
1440///
1441/// Panics if `out.len() != len`.
1442#[inline(always)]
1443fn shift_with_bounds_to<T: Copy>(
1444    data: &[T],
1445    mask: Option<&Bitmask>,
1446    len: usize,
1447    offset_fn: impl Fn(usize) -> Option<usize>,
1448    default: T,
1449    out: &mut [T],
1450    out_mask: &mut Bitmask,
1451) {
1452    assert_eq!(
1453        len,
1454        out.len(),
1455        "shift_with_bounds_to: input/output length mismatch"
1456    );
1457    for i in 0..len {
1458        if let Some(j) = offset_fn(i) {
1459            out[i] = data[j];
1460            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
1461            unsafe { out_mask.set_unchecked(i, is_valid) };
1462        } else {
1463            out[i] = default;
1464        }
1465    }
1466}
1467
1468/// Allocating variant: creates new output buffers internally.
1469#[inline(always)]
1470fn shift_with_bounds<T: Copy>(
1471    data: &[T],
1472    mask: Option<&Bitmask>,
1473    len: usize,
1474    offset_fn: impl Fn(usize) -> Option<usize>,
1475    default: T,
1476) -> (Vec64<T>, Bitmask) {
1477    let mut out = prealloc_vec::<T>(len);
1478    let mut out_mask = Bitmask::new_set_all(len, false);
1479    shift_with_bounds_to(data, mask, len, offset_fn, default, &mut out, &mut out_mask);
1480    (out, out_mask)
1481}
1482
1483#[inline(always)]
1484fn shift_str_with_bounds<T: Integer>(
1485    arr: StringAVT<T>,
1486    offset_fn: impl Fn(usize) -> Option<usize>,
1487) -> Result<StringArray<T>, KernelError> {
1488    let (array, offset, len) = arr;
1489    let src_mask = array.null_mask.as_ref();
1490    let _ = confirm_mask_capacity(array.len(), src_mask)?;
1491
1492    // Determine offsets and total bytes required
1493    let mut offsets = Vec64::<T>::with_capacity(len + 1);
1494    unsafe {
1495        offsets.set_len(len + 1);
1496    }
1497    offsets[0] = T::zero();
1498
1499    let mut total_bytes = 0;
1500    let mut string_lengths = vec![0usize; len];
1501
1502    for i in 0..len {
1503        let byte_len = if let Some(j) = offset_fn(i) {
1504            let src_idx = offset + j;
1505            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1506            if valid {
1507                unsafe { array.get_unchecked(src_idx).unwrap_or("").len() }
1508            } else {
1509                0
1510            }
1511        } else {
1512            0
1513        };
1514        total_bytes += byte_len;
1515        string_lengths[i] = byte_len;
1516        offsets[i + 1] = T::from_usize(total_bytes);
1517    }
1518
1519    // Allocate data buffer
1520    let mut data = Vec64::<u8>::with_capacity(total_bytes);
1521    let mut out_mask = Bitmask::new_set_all(len, false);
1522
1523    // Write string content
1524    for i in 0..len {
1525        if let Some(j) = offset_fn(i) {
1526            let src_idx = offset + j;
1527            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1528            if valid {
1529                let s = unsafe { array.get_unchecked(src_idx).unwrap_or("") };
1530                data.extend_from_slice(s.as_bytes());
1531                unsafe { out_mask.set_unchecked(i, true) };
1532                continue;
1533            }
1534        }
1535        // Not valid or OOB write nothing
1536    }
1537
1538    Ok(StringArray {
1539        offsets: offsets.into(),
1540        data: data.into(),
1541        null_mask: Some(out_mask),
1542    })
1543}
1544
1545// Integer
1546
1547/// Accesses values from previous positions in integer arrays with configurable offset.
1548///
1549/// Implements SQL LAG() window function semantics, retrieving values from earlier positions
1550/// in the array sequence. Essential for time series analysis, trend detection, and
1551/// comparative analytics requiring access to historical data points.
1552///
1553/// ## Parameters
1554/// * `window` - Integer array view containing sequential data for lag access
1555/// * `n` - Lag offset specifying how many positions to look backward
1556///
1557/// ## Returns
1558/// Returns an `IntegerArray<T>` containing:
1559/// - Values from n positions earlier in the sequence
1560/// - Default values for positions where lag source is unavailable
1561/// - Null mask indicating validity of lagged values
1562///
1563/// ## Examples
1564/// ```rust,ignore
1565/// use minarrow::IntegerArray;
1566/// use simd_kernels::kernels::window::lag_int;
1567///
1568/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1569/// let result = lag_int((&arr, 0, arr.len()), 1);
1570/// ```
1571/// Zero-allocation variant that writes directly to caller-provided output buffers.
1572#[inline]
1573pub fn lag_int_to<T: Copy + Default>(
1574    data: &[T],
1575    mask: Option<&Bitmask>,
1576    n: usize,
1577    out: &mut [T],
1578    out_mask: &mut Bitmask,
1579) {
1580    let len = data.len();
1581    shift_with_bounds_to(
1582        data,
1583        mask,
1584        len,
1585        |i| if i >= n { Some(i - n) } else { None },
1586        T::default(),
1587        out,
1588        out_mask,
1589    );
1590}
1591
1592#[inline]
1593pub fn lag_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1594    let (arr, offset, len) = window;
1595    let data_window = &arr.data[offset..offset + len];
1596    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
1597    let mut out = prealloc_vec::<T>(len);
1598    let mut out_mask = Bitmask::new_set_all(len, false);
1599    lag_int_to(data_window, mask.as_ref(), n, &mut out, &mut out_mask);
1600    IntegerArray {
1601        data: out.into(),
1602        null_mask: Some(out_mask),
1603    }
1604}
1605
1606/// Accesses values from future positions in integer arrays with configurable offset.
1607///
1608/// Implements SQL LEAD() window function semantics, retrieving values from later positions
1609/// in the array sequence. Essential for predictive analytics, forward-looking comparisons,
1610/// and temporal analysis requiring access to future data points.
1611///
1612/// ## Parameters
1613/// * `window` - Integer array view containing sequential data for lead access
1614/// * `n` - Lead offset specifying how many positions to look forward
1615///
1616/// ## Returns
1617/// Returns an `IntegerArray<T>` containing:
1618/// - Values from n positions later in the sequence
1619/// - Default values for positions where lead source is unavailable
1620/// - Null mask indicating validity of lead values
1621///
1622/// ## Examples
1623/// ```rust,ignore
1624/// use minarrow::IntegerArray;
1625/// use simd_kernels::kernels::window::lead_int;
1626///
1627/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1628/// let result = lead_int((&arr, 0, arr.len()), 2);
1629/// ```
1630/// Zero-allocation variant that writes directly to caller-provided output buffers.
1631#[inline]
1632pub fn lead_int_to<T: Copy + Default>(
1633    data: &[T],
1634    mask: Option<&Bitmask>,
1635    n: usize,
1636    out: &mut [T],
1637    out_mask: &mut Bitmask,
1638) {
1639    let len = data.len();
1640    shift_with_bounds_to(
1641        data,
1642        mask,
1643        len,
1644        |i| if i + n < len { Some(i + n) } else { None },
1645        T::default(),
1646        out,
1647        out_mask,
1648    );
1649}
1650
1651#[inline]
1652pub fn lead_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1653    let (arr, offset, len) = window;
1654    let data_window = &arr.data[offset..offset + len];
1655    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
1656    let mut out = prealloc_vec::<T>(len);
1657    let mut out_mask = Bitmask::new_set_all(len, false);
1658    lead_int_to(data_window, mask.as_ref(), n, &mut out, &mut out_mask);
1659    IntegerArray {
1660        data: out.into(),
1661        null_mask: Some(out_mask),
1662    }
1663}
1664
1665/// Zero-allocation variant that writes directly to caller-provided output buffers.
1666#[inline]
1667pub fn lag_float_to<T: Copy + num_traits::Zero>(
1668    data: &[T],
1669    mask: Option<&Bitmask>,
1670    n: usize,
1671    out: &mut [T],
1672    out_mask: &mut Bitmask,
1673) {
1674    let len = data.len();
1675    shift_with_bounds_to(
1676        data,
1677        mask,
1678        len,
1679        |i| if i >= n { Some(i - n) } else { None },
1680        T::zero(),
1681        out,
1682        out_mask,
1683    );
1684}
1685
1686/// Zero-allocation variant that writes directly to caller-provided output buffers.
1687#[inline]
1688pub fn lead_float_to<T: Copy + num_traits::Zero>(
1689    data: &[T],
1690    mask: Option<&Bitmask>,
1691    n: usize,
1692    out: &mut [T],
1693    out_mask: &mut Bitmask,
1694) {
1695    let len = data.len();
1696    shift_with_bounds_to(
1697        data,
1698        mask,
1699        len,
1700        |i| if i + n < len { Some(i + n) } else { None },
1701        T::zero(),
1702        out,
1703        out_mask,
1704    );
1705}
1706
1707/// Accesses values from previous positions in floating-point arrays with IEEE 754 compliance.
1708#[inline]
1709pub fn lag_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1710    let (arr, offset, len) = window;
1711    let data_window = &arr.data[offset..offset + len];
1712    let mask_window = if len != arr.data.len() {
1713        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1714    } else {
1715        arr.null_mask.clone()
1716    };
1717    let (data, null_mask) = shift_with_bounds(
1718        data_window,
1719        mask_window.as_ref(),
1720        len,
1721        |i| if i >= n { Some(i - n) } else { None },
1722        T::zero(),
1723    );
1724    FloatArray {
1725        data: data.into(),
1726        null_mask: Some(null_mask),
1727    }
1728}
1729
1730/// Accesses values from future positions in floating-point arrays with IEEE 754 compliance.
1731///
1732/// Implements SQL LEAD() function for floating-point data, retrieving values from later
1733/// positions while preserving IEEE 754 semantics. Essential for forward-looking analysis
1734/// and predictive modeling with continuous numerical data.
1735///
1736/// ## Parameters
1737/// * `window` - Float array view containing sequential floating-point data
1738/// * `n` - Lead offset specifying forward position distance
1739///
1740/// ## Returns
1741/// Returns a `FloatArray<T>` containing:
1742/// - Floating-point values from n positions later
1743/// - Zero values for positions beyond available future
1744/// - Null mask indicating lead validity and special value propagation
1745///
1746/// ## Use Cases
1747/// - **Predictive analytics**: Accessing future values for comparison and modeling
1748/// - **Signal analysis**: Forward-looking operations in digital signal processing
1749/// - **Financial modeling**: Computing forward returns and future value analysis
1750/// - **Scientific computing**: Implementing forward difference schemes
1751///
1752/// ## Examples
1753/// ```rust,ignore
1754/// use minarrow::FloatArray;
1755/// use simd_kernels::kernels::window::lead_float;
1756///
1757/// let arr = FloatArray::<f32>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1758/// let result = lead_float((&arr, 0, arr.len()), 2);
1759/// // Output: [3.3, 4.4, 0.0, 0.0] - lead by 2 positions
1760/// ```
1761#[inline]
1762pub fn lead_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1763    let (arr, offset, len) = window;
1764    let data_window = &arr.data[offset..offset + len];
1765    let mask_window = if len != arr.data.len() {
1766        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1767    } else {
1768        arr.null_mask.clone()
1769    };
1770    let (data, null_mask) = shift_with_bounds(
1771        data_window,
1772        mask_window.as_ref(),
1773        len,
1774        |i| if i + n < len { Some(i + n) } else { None },
1775        T::zero(),
1776    );
1777    FloatArray {
1778        data: data.into(),
1779        null_mask: Some(null_mask),
1780    }
1781}
1782
1783// String
1784
1785/// Accesses string values from previous positions with UTF-8 string handling.
1786///
1787/// Implements SQL LAG() function for string data, retrieving textual values from earlier
1788/// positions in the array sequence. Essential for textual analysis, sequential string
1789/// processing, and comparative text analytics.
1790///
1791/// ## Parameters
1792/// * `arr` - String array view containing sequential textual data
1793/// * `n` - Lag offset specifying backward position distance
1794///
1795/// ## Returns
1796/// Returns `Result<StringArray<T>, KernelError>` containing:
1797/// - **Success**: String values from n positions earlier
1798/// - **Error**: KernelError if string processing fails
1799/// - Empty strings for positions with insufficient history
1800/// - Null mask indicating lag validity and source availability
1801///
1802/// ## String Lag Semantics
1803/// - **UTF-8 preservation**: Maintains proper UTF-8 encoding throughout operation
1804/// - **Null propagation**: Null strings in source positions result in null outputs
1805/// - **Memory management**: Efficient string copying and allocation strategies
1806/// - **Boundary handling**: Positions without history receive empty string defaults
1807///
1808/// ## Applications
1809/// - **Text analysis**: Comparing current text with previous entries
1810/// - **Sequential processing**: Analysing patterns in ordered textual data
1811/// - **Log analysis**: Accessing previous log entries for context
1812/// - **Natural language processing**: Context-aware text processing with history
1813///
1814/// ## Examples
1815/// ```rust,ignore
1816/// use minarrow::StringArray;
1817/// use simd_kernels::kernels::window::lag_str;
1818///
1819/// let arr = StringArray::<u32>::from_slice(&["first", "second", "third"]);
1820/// let result = lag_str((&arr, 0, arr.len()), 1).unwrap();
1821/// // Output: ["", "first", "second"] - strings lagged by 1 position
1822/// ```
1823#[inline]
1824pub fn lag_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1825    shift_str_with_bounds(arr, |i| if i >= n { Some(i - n) } else { None })
1826}
1827
1828/// Accesses string values from future positions with efficient UTF-8 processing.
1829///
1830/// Implements SQL LEAD() function for string data, retrieving textual values from later
1831/// positions in the array sequence. Critical for forward-looking text analysis and
1832/// sequential string pattern recognition.
1833///
1834/// ## Parameters
1835/// * `arr` - String array view containing sequential textual data
1836/// * `n` - Lead offset specifying forward position distance
1837///
1838/// ## Returns
1839/// Returns `Result<StringArray<T>, KernelError>` containing:
1840/// - **Success**: String values from n positions later
1841/// - **Error**: KernelError if string processing encounters issues
1842/// - Empty strings for positions beyond available future
1843/// - Null mask indicating lead validity and source availability
1844///
1845/// ## Examples
1846/// ```rust,ignore
1847/// use minarrow::StringArray;
1848/// use simd_kernels::kernels::window::lead_str;
1849///
1850/// let arr = StringArray::<u32>::from_slice(&["alpha", "beta", "gamma"]);
1851/// let result = lead_str((&arr, 0, arr.len()), 1).unwrap();
1852/// ```
1853#[inline]
1854pub fn lead_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1855    let (_array, _offset, len) = arr;
1856    shift_str_with_bounds(arr, |i| if i + n < len { Some(i + n) } else { None })
1857}
1858
1859// Shift variants
1860/// Shifts integer array elements by specified offset with bidirectional support.
1861///
1862/// Provides unified interface for both LAG and LEAD operations through signed offset
1863/// parameter. Positive offsets implement LEAD semantics (forward shift), negative
1864/// offsets implement LAG semantics (backward shift), enabling flexible positional access.
1865///
1866/// ## Parameters
1867/// * `window` - Integer array view containing data for shifting
1868/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1869///
1870/// ## Returns
1871/// Returns an `IntegerArray<T>` containing:
1872/// - Shifted integer values according to offset direction
1873/// - Default values for positions beyond available data
1874/// - Null mask reflecting validity of shifted positions
1875///
1876/// ## Shift Semantics
1877/// - **Positive offset**: LEAD operation (shift left, access future values)
1878/// - **Negative offset**: LAG operation (shift right, access past values)
1879/// - **Zero offset**: Identity operation (returns original array)
1880/// - **Boundary handling**: Out-of-bounds positions receive default values
1881///
1882/// ## Applications
1883/// - **Time series analysis**: Flexible temporal shifting for comparison operations
1884/// - **Sequence processing**: Bidirectional access in ordered integer sequences
1885/// - **Algorithm implementation**: Building blocks for complex windowing operations
1886/// - **Data transformation**: Positional transformations in numerical datasets
1887///
1888/// ## Examples
1889/// ```rust,ignore
1890/// use minarrow::IntegerArray;
1891/// use simd_kernels::kernels::window::shift_int;
1892///
1893/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1894/// let lag = shift_int((&arr, 0, arr.len()), -1);  // LAG by 1
1895/// let lead = shift_int((&arr, 0, arr.len()), 2);  // LEAD by 2
1896/// // lag:  [0, 1, 2, 3] - previous values
1897/// // lead: [3, 4, 0, 0] - future values
1898/// ```
1899#[inline(always)]
1900pub fn shift_int<T: Copy + Default>(window: IntegerAVT<T>, offset: isize) -> IntegerArray<T> {
1901    let (arr, win_offset, win_len) = window;
1902    if offset == 0 {
1903        return IntegerArray {
1904            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1905            null_mask: if win_len != arr.data.len() {
1906                arr.null_mask
1907                    .as_ref()
1908                    .map(|m| m.slice_clone(win_offset, win_len))
1909            } else {
1910                arr.null_mask.clone()
1911            },
1912        };
1913    } else if offset > 0 {
1914        lead_int((arr, win_offset, win_len), offset as usize)
1915    } else {
1916        lag_int((arr, win_offset, win_len), offset.unsigned_abs())
1917    }
1918}
1919
1920/// Shifts floating-point array elements with IEEE 754 compliance and bidirectional support.
1921///
1922/// Unified shifting interface for floating-point data supporting both LAG and LEAD semantics
1923/// through signed offset parameter. Maintains IEEE 754 standards for special value handling
1924/// while providing efficient bidirectional positional access.
1925///
1926/// ## Parameters
1927/// * `window` - Float array view containing data for shifting
1928/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1929///
1930/// ## Returns
1931/// Returns a `FloatArray<T>` containing:
1932/// - Shifted floating-point values preserving IEEE 754 semantics
1933/// - Zero values for positions beyond data boundaries
1934/// - Null mask indicating validity of shifted positions
1935///
1936/// ## Examples
1937/// ```rust,ignore
1938/// use minarrow::FloatArray;
1939/// use simd_kernels::kernels::window::shift_float;
1940///
1941/// let arr = FloatArray::<f64>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1942/// let backward = shift_float((&arr, 0, arr.len()), -2); // LAG by 2
1943/// let forward = shift_float((&arr, 0, arr.len()), 1);   // LEAD by 1
1944/// ```
1945#[inline(always)]
1946pub fn shift_float<T: Copy + num_traits::Zero>(
1947    window: FloatAVT<T>,
1948    offset: isize,
1949) -> FloatArray<T> {
1950    let (arr, win_offset, win_len) = window;
1951    if offset == 0 {
1952        return FloatArray {
1953            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1954            null_mask: if win_len != arr.data.len() {
1955                arr.null_mask
1956                    .as_ref()
1957                    .map(|m| m.slice_clone(win_offset, win_len))
1958            } else {
1959                arr.null_mask.clone()
1960            },
1961        };
1962    } else if offset > 0 {
1963        lead_float((arr, win_offset, win_len), offset as usize)
1964    } else {
1965        lag_float((arr, win_offset, win_len), offset.unsigned_abs())
1966    }
1967}
1968
1969/// Shifts string array elements with UTF-8 integrity and bidirectional offset support.
1970///
1971/// String shifting function supporting both LAG and LEAD operations through
1972/// signed offset parameter. Maintains UTF-8 encoding integrity while providing flexible
1973/// positional access for textual sequence analysis.
1974///
1975/// ## Parameters
1976/// * `arr` - String array view containing textual data for shifting
1977/// * `shift_offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1978///
1979/// ## Returns
1980/// Returns `Result<StringArray<T>, KernelError>` containing:
1981/// - **Success**: Shifted string values maintaining UTF-8 integrity
1982/// - **Error**: KernelError if string processing encounters issues
1983/// - Empty strings for positions beyond data boundaries
1984/// - Null mask reflecting validity of shifted string positions
1985///
1986/// ## Shift Semantics
1987/// - **Positive offset**: LEAD operation accessing future string values
1988/// - **Negative offset**: LAG operation accessing historical string values
1989/// - **Zero offset**: Identity operation (returns cloned array slice)
1990/// - **Boundary conditions**: Out-of-range positions produce empty strings
1991///
1992/// ## Examples
1993/// ```rust,ignore
1994/// use minarrow::StringArray;
1995/// use simd_kernels::kernels::window::shift_str;
1996///
1997/// let arr = StringArray::<u32>::from_slice(&["one", "two", "three"]);
1998/// let back = shift_str((&arr, 0, arr.len()), -1).unwrap(); // LAG
1999/// let forward = shift_str((&arr, 0, arr.len()), 1).unwrap(); // LEAD
2000/// // back:    ["", "one", "two"]
2001/// // forward: ["two", "three", ""]
2002/// ```
2003#[inline(always)]
2004pub fn shift_str<T: Integer>(
2005    arr: StringAVT<T>,
2006    shift_offset: isize,
2007) -> Result<StringArray<T>, KernelError> {
2008    if shift_offset == 0 {
2009        // Return this slice's window as a cloned StringArray
2010        let (array, off, len) = arr;
2011        Ok(array.slice_clone(off, len))
2012    } else if shift_offset > 0 {
2013        lead_str(arr, shift_offset as usize)
2014    } else {
2015        lag_str(arr, shift_offset.unsigned_abs())
2016    }
2017}
2018
2019#[cfg(test)]
2020mod tests {
2021    use minarrow::structs::variants::float::FloatArray;
2022    use minarrow::structs::variants::integer::IntegerArray;
2023    use minarrow::structs::variants::string::StringArray;
2024    use minarrow::{Bitmask, BooleanArray};
2025
2026    use super::*;
2027
2028    // ─────────────────────────── Helpers ───────────────────────────
2029
2030    /// Build a `Bitmask` from booleans.
2031    fn bm(bits: &[bool]) -> Bitmask {
2032        let mut m = Bitmask::new_set_all(bits.len(), false);
2033        for (i, &b) in bits.iter().enumerate() {
2034            m.set(i, b);
2035        }
2036        m
2037    }
2038
2039    /// Simple equality for `IntegerArray<T>`
2040    fn expect_int<T: PartialEq + std::fmt::Debug + Clone>(
2041        arr: &IntegerArray<T>,
2042        values: &[T],
2043        valid: &[bool],
2044    ) {
2045        assert_eq!(arr.data.as_slice(), values);
2046        let mask = arr.null_mask.as_ref().expect("mask missing");
2047        for (i, &v) in valid.iter().enumerate() {
2048            assert_eq!(mask.get(i), v, "mask bit {}", i);
2049        }
2050    }
2051
2052    /// Simple equality for `FloatArray<T>`
2053    fn expect_float<T: num_traits::Float + std::fmt::Debug>(
2054        arr: &FloatArray<T>,
2055        values: &[T],
2056        valid: &[bool],
2057        eps: T,
2058    ) {
2059        let data = arr.data.as_slice();
2060        assert_eq!(data.len(), values.len());
2061        for (a, b) in data.iter().zip(values.iter()) {
2062            assert!((*a - *b).abs() <= eps, "value mismatch {:?} vs {:?}", a, b);
2063        }
2064        let mask = arr.null_mask.as_ref().expect("mask missing");
2065        for (i, &v) in valid.iter().enumerate() {
2066            assert_eq!(mask.get(i), v);
2067        }
2068    }
2069
2070    // ───────────────────────── Rolling kernels ─────────────────────────
2071
2072    #[test]
2073    fn test_rolling_sum_int_basic() {
2074        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
2075        let res = rolling_sum_int((&arr, 0, arr.len()), 3);
2076        expect_int(&res, &[0, 0, 6, 9, 12], &[false, false, true, true, true]);
2077    }
2078
2079    #[test]
2080    fn test_rolling_sum_int_masked() {
2081        let mut arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
2082        arr.null_mask = Some(bm(&[true, false, true, true]));
2083        let res = rolling_sum_int((&arr, 0, arr.len()), 2);
2084        expect_int(
2085            &res,
2086            &[0, 0, 3, 7],
2087            &[false, false, false, true], // window valid only when no nulls in window
2088        );
2089    }
2090
2091    #[test]
2092    fn test_rolling_sum_float() {
2093        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
2094        let res = rolling_sum_float((&arr, 0, arr.len()), 2);
2095        expect_float(&res, &[0.0, 0.0, 5.0], &[false, false, true], 1e-6f32);
2096    }
2097
2098    #[test]
2099    fn test_rolling_sum_bool() {
2100        let bools = BooleanArray::from_slice(&[true, true, false, true]);
2101        let res = rolling_sum_bool((&bools, 0, bools.len()), 2); // counts trues over window
2102        expect_int(&res, &[0, 0, 1, 1], &[false, false, true, true]);
2103    }
2104
2105    #[test]
2106    fn test_rolling_min_max_mean_count() {
2107        let arr = IntegerArray::<i32>::from_slice(&[3, 1, 4, 1, 5]);
2108        // min
2109        let rmin = rolling_min_int((&arr, 0, arr.len()), 2);
2110        expect_int(&rmin, &[0, 0, 1, 1, 1], &[false, false, true, true, true]);
2111
2112        // max
2113        let rmax = rolling_max_int((&arr, 0, arr.len()), 3);
2114        expect_int(&rmax, &[0, 0, 4, 4, 5], &[false, false, true, true, true]);
2115
2116        // mean
2117        let rmean = rolling_mean_int((&arr, 0, arr.len()), 2);
2118        expect_float(
2119            &rmean,
2120            &[0.0, 0.0, 2.5, 2.5, 3.0],
2121            &[false, false, true, true, true],
2122            1e-12,
2123        );
2124
2125        // count
2126        let cnt = rolling_count((0, 5), 3);
2127        expect_int(&cnt, &[0, 0, 3, 3, 3], &[false, false, true, true, true]);
2128    }
2129
2130    // ───────────────────────── Rank / Dense-rank ─────────────────────────
2131
2132    #[test]
2133    fn test_rank_int_basic() {
2134        let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20]);
2135        let res = rank_int((&arr, 0, arr.len()));
2136        expect_int(&res, &[3, 1, 2], &[true, true, true]);
2137    }
2138
2139    #[test]
2140    fn test_rank_float_with_nulls() {
2141        let mut arr = FloatArray::<f64>::from_slice(&[2.0, 1.0, 3.0]);
2142        arr.null_mask = Some(bm(&[true, false, true]));
2143        let res = rank_float((&arr, 0, arr.len()));
2144        expect_int(&res, &[2, 0, 3], &[true, false, true]);
2145    }
2146
2147    #[test]
2148    fn test_dense_rank_str_duplicates() {
2149        let arr = StringArray::<u32>::from_slice(&["b", "a", "b", "c"]);
2150        let res = dense_rank_str((&arr, 0, arr.len())).unwrap();
2151        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
2152    }
2153
2154    #[test]
2155    fn test_dense_rank_str_duplicates_chunk() {
2156        // Windowed over ["x", "b", "a", "b", "c", "y"]
2157        let arr = StringArray::<u32>::from_slice(&["x", "b", "a", "b", "c", "y"]);
2158        let res = dense_rank_str((&arr, 1, 4)).unwrap(); // window is "b", "a", "b", "c"
2159        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
2160    }
2161
2162    // ───────────────────────── Lag / Lead / Shift ─────────────────────────
2163
2164    #[test]
2165    fn test_lag_lead_int() {
2166        let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
2167        let lag1 = lag_int((&arr, 0, arr.len()), 1);
2168        expect_int(&lag1, &[0, 10, 20, 30], &[false, true, true, true]);
2169
2170        let lead2 = lead_int((&arr, 0, arr.len()), 2);
2171        expect_int(&lead2, &[30, 40, 0, 0], &[true, true, false, false]);
2172    }
2173
2174    #[test]
2175    fn test_shift_float_positive_negative_zero() {
2176        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
2177        let s0 = shift_float((&arr, 0, arr.len()), 0);
2178        assert_eq!(s0.data, arr.data); // exact copy
2179
2180        let s1 = shift_float((&arr, 0, arr.len()), 1);
2181        expect_float(&s1, &[2.0, 3.0, 0.0], &[true, true, false], 1e-6f32);
2182
2183        let s_neg = shift_float((&arr, 0, arr.len()), -1);
2184        expect_float(&s_neg, &[0.0, 1.0, 2.0], &[false, true, true], 1e-6f32);
2185    }
2186
2187    #[test]
2188    fn test_lag_lead_str() {
2189        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2190        let l1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2191        assert_eq!(l1.get(0), None);
2192        assert_eq!(l1.get(1), Some("a"));
2193        assert_eq!(l1.get(2), Some("b"));
2194
2195        let d1 = lead_str((&arr, 0, arr.len()), 1).unwrap();
2196        assert_eq!(d1.get(0), Some("b"));
2197        assert_eq!(d1.get(1), Some("c"));
2198        assert_eq!(d1.get(2), None);
2199    }
2200
2201    #[test]
2202    fn test_lag_lead_str_chunk() {
2203        // Window is ["x", "a", "b", "c", "y"], test on chunk "a", "b", "c"
2204        let arr = StringArray::<u32>::from_slice(&["x", "a", "b", "c", "y"]);
2205        let l1 = lag_str((&arr, 1, 3), 1).unwrap();
2206        assert_eq!(l1.get(0), None);
2207        assert_eq!(l1.get(1), Some("a"));
2208        assert_eq!(l1.get(2), Some("b"));
2209
2210        let d1 = lead_str((&arr, 1, 3), 1).unwrap();
2211        assert_eq!(d1.get(0), Some("b"));
2212        assert_eq!(d1.get(1), Some("c"));
2213        assert_eq!(d1.get(2), None);
2214    }
2215
2216    #[test]
2217    fn test_rolling_sum_int_edge_windows() {
2218        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
2219
2220        // window = 0 -> all zeros + mask all false
2221        let r0 = rolling_sum_int((&arr, 0, arr.len()), 0);
2222        assert_eq!(r0.data.as_slice(), &[0, 0, 0, 0, 0]);
2223        assert_eq!(r0.null_mask.as_ref().unwrap().all_unset(), true);
2224
2225        // window = 1 -> identity
2226        let r1 = rolling_sum_int((&arr, 0, arr.len()), 1);
2227        assert_eq!(r1.data.as_slice(), &[1, 2, 3, 4, 5]);
2228        assert!(r1.null_mask.as_ref().unwrap().all_set());
2229
2230        // window > len -> all zero + all false
2231        let r_large = rolling_sum_int((&arr, 0, arr.len()), 10);
2232        assert_eq!(r_large.data.as_slice(), &[0, 0, 0, 0, 0]);
2233        assert_eq!(r_large.null_mask.as_ref().unwrap().all_unset(), true);
2234    }
2235
2236    #[test]
2237    fn test_rolling_sum_float_masked_nulls_propagate() {
2238        let mut arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0, 4.0]);
2239        // null in the middle
2240        arr.null_mask = Some(bm(&[true, true, false, true]));
2241        let r = rolling_sum_float((&arr, 0, arr.len()), 2);
2242        //   i=0: <full-window, 0.0, false>
2243        //   i=1: first full-window -> cleared -> 0.0, false
2244        //   i=2: window contains null -> mask=false, but value = 2.0
2245        //   i=3: window contains null -> mask=false, but value = 4.0
2246        expect_float(
2247            &r,
2248            &[0.0, 0.0, 2.0, 4.0],
2249            &[false, false, false, false],
2250            1e-6,
2251        );
2252    }
2253
2254    #[test]
2255    fn test_rolling_sum_bool_with_nulls() {
2256        let mut b = BooleanArray::from_slice(&[true, false, true, true]);
2257        b.null_mask = Some(bm(&[true, false, true, true]));
2258        let r = rolling_sum_bool((&b, 0, b.len()), 2);
2259        // windows [t,f], [f,t], [t,t] -> only last window is all non-null
2260        expect_int(&r, &[0, 0, 1, 2], &[false, false, false, true]);
2261    }
2262
2263    #[test]
2264    fn test_lag_str_null_propagation() {
2265        let mut arr = StringArray::<u32>::from_slice(&["x", "y", "z"]);
2266        arr.null_mask = Some(bm(&[true, false, true])); // y is null
2267        let lag1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2268        assert_eq!(lag1.get(0), None); // no source -> null
2269        assert_eq!(lag1.get(1), Some("x"));
2270        assert_eq!(lag1.get(2), None); // source was null
2271        let m = lag1.null_mask.unwrap();
2272        assert_eq!(m.get(0), false);
2273        assert_eq!(m.get(1), true);
2274        assert_eq!(m.get(2), false);
2275    }
2276
2277    #[test]
2278    fn test_lag_str_null_propagation_chunk() {
2279        // Window ["w", "x", "y", "z", "q"], test on chunk "x", "y", "z"
2280        let mut arr = StringArray::<u32>::from_slice(&["w", "x", "y", "z", "q"]);
2281        arr.null_mask = Some(bm(&[true, true, false, true, true]));
2282        let lag1 = lag_str((&arr, 1, 3), 1).unwrap();
2283        assert_eq!(lag1.get(0), None); // "x", index 0 in chunk, no source
2284        assert_eq!(lag1.get(1), Some("x")); // "y", index 1 pulls "x" (valid)
2285        assert_eq!(lag1.get(2), None); // "z", index 2 pulls "y" (invalid)
2286        let m = lag1.null_mask.unwrap();
2287        assert_eq!(m.get(0), false);
2288        assert_eq!(m.get(1), true);
2289        assert_eq!(m.get(2), false);
2290    }
2291
2292    #[test]
2293    fn test_shift_str_large_offset() {
2294        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2295        let shifted = shift_str((&arr, 0, arr.len()), 10).unwrap(); // > len
2296        assert_eq!(shifted.len(), 3);
2297        for i in 0..3 {
2298            assert_eq!(shifted.get(i), None);
2299            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2300        }
2301    }
2302
2303    #[test]
2304    fn test_shift_str_large_offset_chunk() {
2305        // Window ["w", "a", "b", "c", "x"]
2306        let arr = StringArray::<u32>::from_slice(&["w", "a", "b", "c", "x"]);
2307        let shifted = shift_str((&arr, 1, 3), 10).unwrap(); // window is "a","b","c"
2308        assert_eq!(shifted.len(), 3);
2309        for i in 0..3 {
2310            assert_eq!(shifted.get(i), None);
2311            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2312        }
2313    }
2314
2315    #[test]
2316    fn test_rank_str_ties_and_nulls() {
2317        let mut arr = StringArray::<u32>::from_slice(&["a", "b", "a", "c"]);
2318        arr.null_mask = Some(bm(&[true, true, false, true]));
2319        let r = rank_str((&arr, 0, arr.len())).unwrap();
2320        // valid positions: 0="a"(rank1),1="b"(rank3),2=null,3="c"(rank4)
2321        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2322    }
2323
2324    #[test]
2325    fn test_rank_str_ties_and_nulls_chunk() {
2326        // Window ["w", "a", "b", "a", "c"]
2327        let mut arr = StringArray::<u32>::from_slice(&["w", "a", "b", "a", "c"]);
2328        arr.null_mask = Some(bm(&[true, true, true, false, true]));
2329        let r = rank_str((&arr, 1, 4)).unwrap(); // "a","b","a","c"
2330        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2331    }
2332}