simd_kernels/kernels/window.rs
1// Copyright Peter Bower 2025. All Rights Reserved.
2// Licensed under Mozilla Public License (MPL) 2.0.
3
4//! # **Window Functions Kernels Module** - *High-Performance Analytical Window Operations*
5//!
6//! Advanced window function kernels for sliding window computations,
7//! ranking operations, and positional analytics with SIMD acceleration and null-aware semantics.
8//! Backbone of time series analysis, analytical SQL window functions, and chunked streaming computations.
9//!
10//! ## Core Operations
11//! - **Moving averages**: Rolling mean calculations with configurable window sizes
12//! - **Cumulative functions**: Running sums, products, and statistical aggregations
13//! - **Ranking functions**: ROW_NUMBER, RANK, DENSE_RANK with tie-handling strategies
14//! - **Lead/lag operations**: Positional value access with configurable offsets
15//! - **Percentile functions**: Moving quantile calculations with interpolation support
16//! - **Window aggregates**: MIN, MAX, SUM operations over sliding windows
17
18include!(concat!(env!("OUT_DIR"), "/simd_lanes.rs"));
19
20use std::marker::PhantomData;
21
22use minarrow::{
23 Bitmask, BooleanAVT, BooleanArray, FloatArray, Integer, IntegerArray, Length, MaskedArray,
24 Offset, StringArray, Vec64,
25 aliases::{FloatAVT, IntegerAVT},
26 vec64,
27};
28use num_traits::{Float, Num, NumCast, One, Zero};
29
30use crate::{errors::KernelError, utils::confirm_mask_capacity};
31use minarrow::StringAVT;
32
33// Helpers
34#[inline(always)]
35fn new_null_mask(len: usize) -> Bitmask {
36 Bitmask::new_set_all(len, false)
37}
38
39#[inline(always)]
40fn prealloc_vec<T: Copy>(len: usize) -> Vec64<T> {
41 let mut v = Vec64::<T>::with_capacity(len);
42 unsafe { v.set_len(len) };
43 v
44}
45
46// Rolling kernels (sum, product, min, max, mean, count)
47
48/// Generic sliding window aggregator for kernels that allow an
49/// incremental push and pop update (sum, product, etc.).
50/// Always emits the running aggregate, even when the subwindow has nulls.
51/// Only flags “valid” once the full subwindow has been seen.
52#[inline(always)]
53fn rolling_push_pop<T, FAdd, FRem>(
54 data: &[T],
55 mask: Option<&Bitmask>,
56 subwindow: usize,
57 mut add: FAdd,
58 mut remove: FRem,
59 zero: T,
60) -> (Vec64<T>, Bitmask)
61where
62 T: Copy,
63 FAdd: FnMut(T, T) -> T,
64 FRem: FnMut(T, T) -> T,
65{
66 let n = data.len();
67 let mut out = prealloc_vec::<T>(n);
68 let mut out_mask = new_null_mask(n);
69
70 if subwindow == 0 {
71 for slot in &mut out {
72 *slot = zero;
73 }
74 return (out, out_mask);
75 }
76
77 let mut agg = zero;
78 let mut invalids = 0usize;
79 for i in 0..n {
80 if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
81 agg = add(agg, data[i]);
82 } else {
83 invalids += 1;
84 }
85 if i + 1 > subwindow {
86 let j = i + 1 - subwindow - 1;
87 if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
88 agg = remove(agg, data[j]);
89 } else {
90 invalids -= 1;
91 }
92 }
93 if i + 1 < subwindow {
94 unsafe { out_mask.set_unchecked(i, false) };
95 out[i] = zero;
96 } else {
97 let ok = invalids == 0;
98 unsafe { out_mask.set_unchecked(i, ok) };
99 out[i] = agg;
100 }
101 }
102 (out, out_mask)
103}
104
105/// Generic rolling extreme aggregator (min/max) for a subwindow over a slice.
106#[inline(always)]
107fn rolling_extreme<T, F>(
108 data: &[T],
109 mask: Option<&Bitmask>,
110 subwindow: usize,
111 mut better: F,
112 zero: T,
113) -> (Vec64<T>, Bitmask)
114where
115 T: Copy,
116 F: FnMut(&T, &T) -> bool,
117{
118 let n = data.len();
119 let mut out = prealloc_vec::<T>(n);
120 let mut out_mask = new_null_mask(n);
121
122 if subwindow == 0 {
123 return (out, out_mask);
124 }
125
126 for i in 0..n {
127 if i + 1 < subwindow {
128 unsafe { out_mask.set_unchecked(i, false) };
129 out[i] = zero;
130 continue;
131 }
132 let start = i + 1 - subwindow;
133 let mut found = false;
134 let mut extreme = zero;
135 for j in start..=i {
136 if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
137 if !found {
138 extreme = data[j];
139 found = true;
140 } else if better(&data[j], &extreme) {
141 extreme = data[j];
142 }
143 } else {
144 found = false;
145 break;
146 }
147 }
148 unsafe { out_mask.set_unchecked(i, found) };
149 out[i] = if found { extreme } else { zero };
150 }
151 (out, out_mask)
152}
153
154/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
155///
156/// Applies a sliding window of configurable size to compute cumulative sums, employing
157/// incremental computation to avoid O(n²) complexity through efficient push-pop operations.
158/// Each position in the output represents the sum of values within the preceding window.
159///
160/// ## Parameters
161/// * `window` - Integer array view containing the data, offset, and length information
162/// * `subwindow` - Size of the sliding window (number of elements to sum)
163///
164/// ## Returns
165/// Returns an `IntegerArray<T>` containing:
166/// - Rolling sums for each position where a complete window exists
167/// - Zero values for positions before the window is complete
168/// - Null mask indicating validity (false for incomplete windows or null-contaminated windows)
169///
170/// ## Examples
171/// ```rust,ignore
172/// use minarrow::IntegerArray;
173/// use simd_kernels::kernels::window::rolling_sum_int;
174///
175/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
176/// let result = rolling_sum_int((&arr, 0, arr.len()), 3);
177/// ```
178#[inline]
179pub fn rolling_sum_int<T: Num + Copy + Zero>(
180 window: IntegerAVT<'_, T>,
181 subwindow: usize,
182) -> IntegerArray<T> {
183 let (arr, offset, len) = window;
184 let data = &arr.data[offset..offset + len];
185 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
186 let (mut out, mut out_mask) = rolling_push_pop(
187 data,
188 mask.as_ref(),
189 subwindow,
190 |a, b| a + b,
191 |a, b| a - b,
192 T::zero(),
193 );
194 if arr.null_mask.is_some() && subwindow > 0 && subwindow - 1 < out.len() {
195 unsafe { out_mask.set_unchecked(subwindow - 1, false) };
196 out[subwindow - 1] = T::zero();
197 }
198 IntegerArray {
199 data: out.into(),
200 null_mask: Some(out_mask),
201 }
202}
203
204/// Computes rolling sums over a sliding window for floating-point data with IEEE 754 compliance.
205///
206/// Applies incremental computation to calculate cumulative sums across sliding windows,
207/// maintaining numerical stability through careful accumulation strategies. Handles
208/// special floating-point values (infinity, NaN) according to IEEE 754 semantics.
209///
210/// ## Parameters
211/// * `window` - Float array view containing the data, offset, and length information
212/// * `subwindow` - Size of the sliding window for summation
213///
214/// ## Returns
215/// Returns a `FloatArray<T>` containing:
216/// - Rolling sums computed incrementally for efficiency
217/// - Zero values for positions with incomplete windows
218/// - Proper null mask for window validity tracking
219///
220/// ## Examples
221/// ```rust,ignore
222/// use minarrow::FloatArray;
223/// use simd_kernels::kernels::window::rolling_sum_float;
224///
225/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.3, 3.7, 4.1]);
226/// let result = rolling_sum_float((&arr, 0, arr.len()), 2);
227/// ```
228#[inline]
229pub fn rolling_sum_float<T: Float + Copy + Zero>(
230 window: FloatAVT<'_, T>,
231 subwindow: usize,
232) -> FloatArray<T> {
233 let (arr, offset, len) = window;
234 let data = &arr.data[offset..offset + len];
235 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
236 let (mut out, mut out_mask) = rolling_push_pop(
237 data,
238 mask.as_ref(),
239 subwindow,
240 |a, b| a + b,
241 |a, b| a - b,
242 T::zero(),
243 );
244 if subwindow > 0 && subwindow - 1 < out.len() {
245 out_mask.set(subwindow - 1, false);
246 out[subwindow - 1] = T::zero();
247 }
248 FloatArray {
249 data: out.into(),
250 null_mask: Some(out_mask),
251 }
252}
253
254/// Computes rolling sums over boolean data, counting true values within sliding windows.
255///
256/// Treats boolean values as integers (true=1, false=0) and applies sliding window
257/// summation to count true occurrences within each window position. Essential for
258/// constructing conditional aggregations and boolean pattern analysis.
259///
260/// ## Parameters
261/// * `window` - Boolean array view with offset and length specifications
262/// * `subwindow` - Number of boolean values to consider in each sliding window
263///
264/// ## Returns
265/// Returns an `IntegerArray<i32>` containing:
266/// - Count of true values within each complete window
267/// - Zero for positions with incomplete windows
268/// - Null mask indicating window completeness and null contamination
269///
270/// ## Examples
271/// ```rust,ignore
272/// use minarrow::BooleanArray;
273/// use simd_kernels::kernels::window::rolling_sum_bool;
274///
275/// let bools = BooleanArray::from_slice(&[true, false, true, true]);
276/// let result = rolling_sum_bool((&bools, 0, bools.len()), 2);
277/// ```
278#[inline]
279pub fn rolling_sum_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> IntegerArray<i32> {
280 let (arr, offset, len) = window;
281 let bools: Vec<i32> = arr.iter_range(offset, len).map(|b| b as i32).collect();
282 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
283 let (mut out, mut out_mask) = rolling_push_pop(
284 &bools,
285 mask.as_ref(),
286 subwindow,
287 |a, b| a + b,
288 |a, b| a - b,
289 0,
290 );
291 if subwindow > 0 && subwindow - 1 < out.len() {
292 out_mask.set(subwindow - 1, false);
293 out[subwindow - 1] = 0;
294 }
295 IntegerArray {
296 data: out.into(),
297 null_mask: Some(out_mask),
298 }
299}
300
301/// Computes rolling products over a sliding window for integer data with overflow protection.
302///
303/// Applies multiplicative aggregation across sliding windows using incremental computation
304/// through division operations. Maintains numerical stability through careful handling of
305/// zero values and potential overflow conditions in integer arithmetic.
306///
307/// ## Parameters
308/// * `window` - Integer array view containing multiplicands and window specification
309/// * `subwindow` - Number of consecutive elements to multiply in each window
310///
311/// ## Returns
312/// Returns an `IntegerArray<T>` containing:
313/// - Rolling products computed via incremental multiplication/division
314/// - Identity value (1) for positions with incomplete windows
315/// - Null mask reflecting window completeness and null contamination
316///
317/// ## Examples
318/// ```rust,ignore
319/// use minarrow::IntegerArray;
320/// use simd_kernels::kernels::window::rolling_product_int;
321///
322/// let arr = IntegerArray::<i32>::from_slice(&[2, 3, 4, 5]);
323/// let result = rolling_product_int((&arr, 0, arr.len()), 2);
324/// ```
325#[inline]
326pub fn rolling_product_int<T: Num + Copy + One + Zero>(
327 window: IntegerAVT<'_, T>,
328 subwindow: usize,
329) -> IntegerArray<T> {
330 let (arr, offset, len) = window;
331 let data = &arr.data[offset..offset + len];
332 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
333 let (out, out_mask) = rolling_push_pop(
334 data,
335 mask.as_ref(),
336 subwindow,
337 |a, b| a * b,
338 |a, b| a / b,
339 T::one(),
340 );
341 IntegerArray {
342 data: out.into(),
343 null_mask: Some(out_mask),
344 }
345}
346
347/// Computes rolling products over floating-point data with IEEE 754 mathematical semantics.
348///
349/// Performs multiplicative aggregation using incremental computation strategies that
350/// maintain numerical precision through careful handling of special values (infinity,
351/// NaN, zero) according to IEEE 754 standards.
352///
353/// ## Parameters
354/// * `window` - Float array view containing multiplicands for window processing
355/// * `subwindow` - Window size determining number of values to multiply
356///
357/// ## Returns
358/// Returns a `FloatArray<T>` containing:
359/// - Rolling products computed with floating-point precision
360/// - Identity value (1.0) for incomplete window positions
361/// - Comprehensive null mask for validity tracking
362///
363/// ## Examples
364/// ```rust,ignore
365/// use minarrow::FloatArray;
366/// use simd_kernels::kernels::window::rolling_product_float;
367///
368/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.0, 3.0, 4.0]);
369/// let result = rolling_product_float((&arr, 0, arr.len()), 2);
370/// ```
371#[inline]
372pub fn rolling_product_float<T: Float + Copy + One + Zero>(
373 window: FloatAVT<'_, T>,
374 subwindow: usize,
375) -> FloatArray<T> {
376 let (arr, offset, len) = window;
377 let data = &arr.data[offset..offset + len];
378 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
379 let (out, out_mask) = rolling_push_pop(
380 data,
381 mask.as_ref(),
382 subwindow,
383 |a, b| a * b,
384 |a, b| a / b,
385 T::one(),
386 );
387 FloatArray {
388 data: out.into(),
389 null_mask: Some(out_mask),
390 }
391}
392
393/// Computes rolling logical AND operations over boolean data within sliding windows.
394///
395/// Treats boolean multiplication as logical AND operations, computing the conjunction
396/// of all boolean values within each sliding window. Essential for constructing
397/// compound logical conditions and boolean pattern validation.
398///
399/// ## Parameters
400/// * `window` - Boolean array view containing logical values for conjunction
401/// * `subwindow` - Number of boolean values to AND together in each window
402///
403/// ## Returns
404/// Returns a `BooleanArray<()>` containing:
405/// - Logical AND results for each complete window position
406/// - False values for positions with incomplete windows
407/// - Null mask indicating window validity and null contamination
408///
409/// ## Examples
410/// ```rust,ignore
411/// use minarrow::BooleanArray;
412/// use simd_kernels::kernels::window::rolling_product_bool;
413///
414/// let bools = BooleanArray::from_slice(&[true, true, false, true]);
415/// let result = rolling_product_bool((&bools, 0, bools.len()), 2);
416/// ```
417#[inline]
418pub fn rolling_product_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> BooleanArray<()> {
419 let (arr, offset, len) = window;
420 let n = len;
421 let mut out_mask = new_null_mask(n);
422 let mut out = Bitmask::new_set_all(n, false);
423
424 for i in 0..n {
425 let start = if i + 1 >= subwindow {
426 i + 1 - subwindow
427 } else {
428 0
429 };
430 let mut acc = true;
431 let mut valid = subwindow > 0 && i + 1 >= subwindow;
432 for j in start..=i {
433 match unsafe { arr.get_unchecked(offset + j) } {
434 Some(val) => acc &= val,
435 None => {
436 valid = false;
437 break;
438 }
439 }
440 }
441 unsafe { out_mask.set_unchecked(i, valid) };
442 out.set(i, valid && acc);
443 }
444
445 BooleanArray {
446 data: out.into(),
447 null_mask: Some(out_mask),
448 len: n,
449 _phantom: PhantomData,
450 }
451}
452
453/// Computes rolling arithmetic means over integer data with high-precision floating-point output.
454///
455/// Calculates moving averages across sliding windows, converting integer inputs to double-precision
456/// floating-point for accurate mean computation. Essential for time series analysis and statistical
457/// smoothing operations over integer sequences.
458///
459/// ## Parameters
460/// * `window` - Integer array view containing values for mean calculation
461/// * `subwindow` - Window size determining number of values to average
462///
463/// ## Returns
464/// Returns a `FloatArray<f64>` containing:
465/// - Rolling arithmetic means computed with double precision
466/// - Zero values for positions with incomplete windows
467/// - Null mask indicating window completeness and null contamination
468///
469/// ## Examples
470/// ```rust,ignore
471/// use minarrow::IntegerArray;
472/// use simd_kernels::kernels::window::rolling_mean_int;
473///
474/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
475/// let result = rolling_mean_int((&arr, 0, arr.len()), 3);
476/// ```
477#[inline]
478pub fn rolling_mean_int<T: NumCast + Copy + Zero>(
479 window: IntegerAVT<'_, T>,
480 subwindow: usize,
481) -> FloatArray<f64> {
482 let (arr, offset, len) = window;
483 let n = len;
484 let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
485 let mut out = prealloc_vec::<f64>(n);
486 let mut out_mask = new_null_mask(n);
487
488 if subwindow == 0 {
489 return FloatArray {
490 data: out.into(),
491 null_mask: Some(out_mask),
492 };
493 }
494
495 for i in 0..n {
496 if i + 1 < subwindow {
497 unsafe { out_mask.set_unchecked(i, false) };
498 out[i] = 0.0;
499 continue;
500 }
501 let start = i + 1 - subwindow;
502 let mut sum = 0.0;
503 let mut valid = true;
504 for j in start..=i {
505 if mask_ref
506 .as_ref()
507 .map_or(true, |m| unsafe { m.get_unchecked(j) })
508 {
509 sum += num_traits::cast(arr.data[offset + j]).unwrap_or(0.0);
510 } else {
511 valid = false;
512 break;
513 }
514 }
515 unsafe { out_mask.set_unchecked(i, valid) };
516 out[i] = if valid { sum / subwindow as f64 } else { 0.0 };
517 }
518
519 if subwindow > 0 && subwindow - 1 < out.len() {
520 unsafe { out_mask.set_unchecked(subwindow - 1, false) };
521 out[subwindow - 1] = 0.0;
522 }
523
524 FloatArray {
525 data: out.into(),
526 null_mask: Some(out_mask),
527 }
528}
529
530/// Computes rolling arithmetic means over floating-point data with IEEE 754 compliance.
531///
532/// Performs moving average calculations across sliding windows while maintaining the original
533/// floating-point precision. Implements numerically stable summation strategies and proper
534/// handling of special floating-point values (NaN, infinity).
535///
536/// ## Parameters
537/// * `window` - Float array view containing values for mean calculation
538/// * `subwindow` - Window size specifying number of values to average
539///
540/// ## Returns
541/// Returns a `FloatArray<T>` containing:
542/// - Rolling arithmetic means preserving input precision (f32 or f64)
543/// - Zero values for positions with incomplete windows
544/// - Comprehensive null mask for validity indication
545///
546/// ## Examples
547/// ```rust,ignore
548/// use minarrow::FloatArray;
549/// use simd_kernels::kernels::window::rolling_mean_float;
550///
551/// let arr = FloatArray::<f32>::from_slice(&[1.5, 2.5, 3.5, 4.5]);
552/// let result = rolling_mean_float((&arr, 0, arr.len()), 2);
553/// ```
554#[inline]
555pub fn rolling_mean_float<T: Float + Copy + Zero>(
556 window: FloatAVT<'_, T>,
557 subwindow: usize,
558) -> FloatArray<T> {
559 let (arr, offset, len) = window;
560 let n = len;
561 let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
562 let mut out = prealloc_vec::<T>(n);
563 let mut out_mask = new_null_mask(n);
564
565 if subwindow == 0 {
566 return FloatArray {
567 data: out.into(),
568 null_mask: Some(out_mask),
569 };
570 }
571
572 for i in 0..n {
573 if i + 1 < subwindow {
574 unsafe { out_mask.set_unchecked(i, false) };
575 out[i] = T::zero();
576 continue;
577 }
578 let start = i + 1 - subwindow;
579 let mut sum = T::zero();
580 let mut valid = true;
581 for j in start..=i {
582 if mask_ref
583 .as_ref()
584 .map_or(true, |m| unsafe { m.get_unchecked(j) })
585 {
586 sum = sum + arr.data[offset + j];
587 } else {
588 valid = false;
589 break;
590 }
591 }
592 unsafe { out_mask.set_unchecked(i, valid) };
593 out[i] = if valid {
594 sum / T::from(subwindow as u32).unwrap()
595 } else {
596 T::zero()
597 };
598 }
599
600 if subwindow > 0 && subwindow - 1 < out.len() {
601 unsafe { out_mask.set_unchecked(subwindow - 1, false) };
602 out[subwindow - 1] = T::zero();
603 }
604
605 FloatArray {
606 data: out.into(),
607 null_mask: Some(out_mask),
608 }
609}
610
611/// For min, we skip that very first `window` slot so that
612/// the _first_ non-zero result appears one step later.
613/// Computes rolling minimum values over integer data within sliding windows.
614///
615/// Identifies minimum values across sliding windows using efficient comparison strategies.
616/// Each output position represents the smallest value found within the preceding window,
617/// essential for trend analysis and outlier detection in integer sequences.
618///
619/// ## Parameters
620/// * `window` - Integer array view containing values for minimum detection
621/// * `subwindow` - Window size determining scope of minimum search
622///
623/// ## Returns
624/// Returns an `IntegerArray<T>` containing:
625/// - Rolling minimum values for each complete window position
626/// - Zero values for positions with incomplete windows
627/// - Null mask indicating window completeness and validity
628///
629/// ## Use Cases
630/// - **Trend analysis**: Identifying minimum trends in time series data
631/// - **Outlier detection**: Finding exceptionally low values within windows
632/// - **Signal processing**: Detecting minimum signal levels over time intervals
633/// - **Statistical analysis**: Computing rolling minimum statistics
634///
635/// ## Examples
636/// ```rust,ignore
637/// use minarrow::IntegerArray;
638/// use simd_kernels::kernels::window::rolling_min_int;
639///
640/// let arr = IntegerArray::<i32>::from_slice(&[5, 2, 8, 1, 9]);
641/// let result = rolling_min_int((&arr, 0, arr.len()), 3);
642/// // Output: [0, 0, 2, 1, 1] - minimum values in each 3-element window
643/// ```
644#[inline]
645pub fn rolling_min_int<T: Ord + Copy + Zero>(
646 window: IntegerAVT<'_, T>,
647 subwindow: usize,
648) -> IntegerArray<T> {
649 let (arr, offset, len) = window;
650 let data = &arr.data[offset..offset + len];
651 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
652 let (mut out, mut out_mask) =
653 rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
654 if subwindow > 0 && subwindow - 1 < out.len() {
655 out_mask.set(subwindow - 1, false);
656 out[subwindow - 1] = T::zero();
657 }
658 IntegerArray {
659 data: out.into(),
660 null_mask: Some(out_mask),
661 }
662}
663
664/// Computes rolling maximum values over integer data within sliding windows.
665///
666/// Identifies maximum values across sliding windows using efficient comparison operations.
667/// Each output position represents the largest value found within the preceding window,
668/// crucial for peak detection and trend analysis in integer data sequences.
669///
670/// ## Parameters
671/// * `window` - Integer array view containing values for maximum detection
672/// * `subwindow` - Window size determining scope of maximum search
673///
674/// ## Returns
675/// Returns an `IntegerArray<T>` containing:
676/// - Rolling maximum values for each complete window position
677/// - Zero values for positions with incomplete windows
678/// - Comprehensive null mask for validity tracking
679///
680/// ## Applications
681/// - **Peak detection**: Identifying maximum peaks in time series data
682/// - **Trend analysis**: Tracking maximum value trends over sliding intervals
683/// - **Threshold monitoring**: Detecting when maximum values exceed thresholds
684/// - **Signal analysis**: Finding maximum signal amplitudes in windows
685///
686/// ## Examples
687/// ```rust,ignore
688/// use minarrow::IntegerArray;
689/// use simd_kernels::kernels::window::rolling_max_int;
690///
691/// let arr = IntegerArray::<i32>::from_slice(&[3, 7, 2, 9, 4]);
692/// let result = rolling_max_int((&arr, 0, arr.len()), 3);
693/// // Output: [0, 0, 7, 9, 9] - maximum values in each 3-element window
694/// ```
695#[inline]
696pub fn rolling_max_int<T: Ord + Copy + Zero>(
697 window: IntegerAVT<'_, T>,
698 subwindow: usize,
699) -> IntegerArray<T> {
700 let (arr, offset, len) = window;
701 let data = &arr.data[offset..offset + len];
702 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
703 let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
704 IntegerArray {
705 data: out.into(),
706 null_mask: Some(out_mask),
707 }
708}
709
710/// Computes rolling minimum values over floating-point data with IEEE 754 compliance.
711///
712/// Identifies minimum floating-point values across sliding windows while properly handling
713/// special values (NaN, infinity) according to IEEE 754 standards. Essential for numerical
714/// analysis requiring precise minimum detection in floating-point sequences.
715///
716/// ## Parameters
717/// * `window` - Float array view containing values for minimum computation
718/// * `subwindow` - Window size determining minimum search scope
719///
720/// ## Returns
721/// Returns a `FloatArray<T>` containing:
722/// - Rolling minimum values computed with floating-point precision
723/// - Zero values for positions with incomplete windows
724/// - Null mask reflecting window validity and special value handling
725///
726/// ## Applications
727/// - **Scientific computing**: Finding minimum values in numerical simulations
728/// - **Signal processing**: Detecting minimum signal levels with high precision
729/// - **Financial analysis**: Identifying minimum prices in sliding time windows
730/// - **Data analysis**: Computing rolling minimum statistics for trend detection
731///
732/// ## Examples
733/// ```rust,ignore
734/// use minarrow::FloatArray;
735/// use simd_kernels::kernels::window::rolling_min_float;
736///
737/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, 1.73]);
738/// let result = rolling_min_float((&arr, 0, arr.len()), 2);
739/// // Output: [0.0, 0.0, 2.71, 1.41] - minimum values in 2-element windows
740/// ```
741#[inline]
742pub fn rolling_min_float<T: Float + Copy + Zero>(
743 window: FloatAVT<'_, T>,
744 subwindow: usize,
745) -> FloatArray<T> {
746 let (arr, offset, len) = window;
747 let data = &arr.data[offset..offset + len];
748 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
749 let (mut out, mut out_mask) =
750 rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
751 if subwindow > 0 && subwindow - 1 < out.len() {
752 out_mask.set(subwindow - 1, false);
753 out[subwindow - 1] = T::zero();
754 }
755 FloatArray {
756 data: out.into(),
757 null_mask: Some(out_mask),
758 }
759}
760
761/// Computes rolling maximum values over floating-point data with IEEE 754 compliance.
762///
763/// Identifies maximum floating-point values across sliding windows while maintaining
764/// strict adherence to IEEE 754 comparison semantics. Handles special floating-point
765/// values (NaN, infinity) correctly for reliable maximum detection.
766///
767/// ## Parameters
768/// * `window` - Float array view containing values for maximum computation
769/// * `subwindow` - Window size specifying maximum search scope
770///
771/// ## Returns
772/// Returns a `FloatArray<T>` containing:
773/// - Rolling maximum values with full floating-point precision
774/// - Zero values for positions with incomplete windows
775/// - Comprehensive null mask for result validity indication
776///
777/// ## Applications
778/// - **Peak detection**: Identifying maximum peaks in continuous data streams
779/// - **Envelope detection**: Computing upper envelopes of signal data
780/// - **Risk analysis**: Finding maximum risk values in financial time series
781/// - **Scientific measurement**: Detecting maximum readings in sensor data
782///
783/// ## Examples
784/// ```rust,ignore
785/// use minarrow::FloatArray;
786/// use simd_kernels::kernels::window::rolling_max_float;
787///
788/// let arr = FloatArray::<f32>::from_slice(&[1.5, 3.2, 2.1, 4.7]);
789/// let result = rolling_max_float((&arr, 0, arr.len()), 2);
790/// ```
791#[inline]
792pub fn rolling_max_float<T: Float + Copy + Zero>(
793 window: FloatAVT<'_, T>,
794 subwindow: usize,
795) -> FloatArray<T> {
796 let (arr, offset, len) = window;
797 let data = &arr.data[offset..offset + len];
798 let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
799 let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
800 FloatArray {
801 data: out.into(),
802 null_mask: Some(out_mask),
803 }
804}
805
806/// Computes rolling counts of elements within sliding windows for positional analysis.
807///
808/// Counts the number of elements present within each sliding window position, providing
809/// fundamental cardinality information essential for statistical analysis and data validation.
810/// Unlike other rolling functions, operates on position information rather than data values.
811///
812/// ## Parameters
813/// * `window` - Tuple containing offset and length defining the data window scope
814/// * `subwindow` - Size of sliding window for element counting
815///
816/// ## Returns
817/// Returns an `IntegerArray<i32>` containing:
818/// - Count of elements in each complete window (always equals subwindow size)
819/// - Zero values for positions with incomplete windows
820/// - Null mask indicating where complete windows exist
821///
822/// ## Examples
823/// ```rust,ignore
824/// use simd_kernels::kernels::window::rolling_count;
825///
826/// let result = rolling_count((0, 5), 3); // 5 elements, window size 3
827/// ```
828#[inline]
829pub fn rolling_count(window: (Offset, Length), subwindow: usize) -> IntegerArray<i32> {
830 let (_offset, len) = window;
831 let mut out = prealloc_vec::<i32>(len);
832 let mut out_mask = new_null_mask(len);
833 for i in 0..len {
834 let start = if i + 1 >= subwindow {
835 i + 1 - subwindow
836 } else {
837 0
838 };
839 let count = (i - start + 1) as i32;
840 let valid_row = subwindow > 0 && i + 1 >= subwindow;
841 unsafe { out_mask.set_unchecked(i, valid_row) };
842 out[i] = if valid_row { count } else { 0 };
843 }
844 IntegerArray {
845 data: out.into(),
846 null_mask: Some(out_mask),
847 }
848}
849
850// Rank and Dense-rank kernels
851
852#[inline(always)]
853fn rank_numeric<T, F>(data: &[T], mask: Option<&Bitmask>, mut cmp: F) -> IntegerArray<i32>
854where
855 T: Copy,
856 F: FnMut(&T, &T) -> std::cmp::Ordering,
857{
858 let n = data.len();
859 let mut indices: Vec<usize> = (0..n).collect();
860 indices.sort_by(|&i, &j| cmp(&data[i], &data[j]));
861
862 let mut out = vec64![0i32; n];
863 let mut out_mask = Bitmask::new_set_all(n, false);
864
865 for (rank, &i) in indices.iter().enumerate() {
866 if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
867 out[i] = (rank + 1) as i32;
868 unsafe { out_mask.set_unchecked(i, true) };
869 }
870 }
871
872 IntegerArray {
873 data: out.into(),
874 null_mask: Some(out_mask),
875 }
876}
877
878/// Computes standard SQL ROW_NUMBER() ranking for integer data with tie handling.
879///
880/// Assigns sequential rank values to elements based on their sorted order, providing
881/// standard SQL ROW_NUMBER() semantics where tied values receive different ranks.
882/// Essential for analytical queries requiring unique positional ranking.
883///
884/// ## Parameters
885/// * `window` - Integer array view containing values for ranking
886///
887/// ## Returns
888/// Returns an `IntegerArray<i32>` containing:
889/// - Rank values from 1 to n for valid elements
890/// - Zero values for null elements
891/// - Null mask indicating which positions have valid ranks
892///
893/// ## Ranking Semantics
894/// - **ROW_NUMBER() behaviour**: Each element receives a unique rank (1, 2, 3, ...)
895/// - **Tie breaking**: Tied values receive different ranks based on their position
896/// - **Ascending order**: Smaller values receive lower (better) ranks
897/// - **Null exclusion**: Null values are excluded from ranking and receive rank 0
898///
899/// ## Use Cases
900/// - **Analytical queries**: SQL ROW_NUMBER() window function implementation
901/// - **Leaderboards**: Creating ordered rankings with unique positions
902/// - **Percentile calculation**: Basis for percentile and quartile computations
903/// - **Data analysis**: Establishing ordinality in integer datasets
904///
905/// ## Examples
906/// ```rust,ignore
907/// use minarrow::IntegerArray;
908/// use simd_kernels::kernels::window::rank_int;
909///
910/// let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20, 10]);
911/// let result = rank_int((&arr, 0, arr.len()));
912/// // Output: [4, 1, 3, 2] - ROW_NUMBER() style ranking
913/// ```
914#[inline(always)]
915pub fn rank_int<T: Ord + Copy>(window: IntegerAVT<T>) -> IntegerArray<i32> {
916 let (arr, offset, len) = window;
917 let data = &arr.data[offset..offset + len];
918 let null_mask = if len != arr.data.len() {
919 &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
920 } else {
921 &arr.null_mask
922 };
923 rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b))
924}
925
926/// Computes standard SQL ROW_NUMBER() ranking for floating-point data with IEEE 754 compliance.
927///
928/// Assigns sequential rank values based on sorted floating-point order, implementing
929/// ROW_NUMBER() semantics with proper handling of special floating-point values (NaN,
930/// infinity) according to IEEE 754 comparison standards.
931///
932/// ## Parameters
933/// * `window` - Float array view containing values for ranking
934///
935/// ## Returns
936/// Returns an `IntegerArray<i32>` containing:
937/// - Rank values from 1 to n for valid, non-NaN elements
938/// - Zero values for null or NaN elements
939/// - Null mask indicating positions with valid ranks
940///
941/// ## Floating-Point Ranking
942/// - **IEEE 754 ordering**: Uses IEEE 754 compliant comparison operations
943/// - **NaN handling**: NaN values are excluded from ranking (receive rank 0)
944/// - **Infinity treatment**: Positive/negative infinity participate in ranking
945/// - **Precision preservation**: Maintains full floating-point comparison precision
946///
947/// ## Ranking Semantics
948/// - **ROW_NUMBER() style**: Each non-NaN element receives unique sequential rank
949/// - **Ascending order**: Smaller floating-point values receive lower ranks
950/// - **Tie breaking**: Floating-point ties broken by original array position
951/// - **Special value exclusion**: NaN and null values excluded from rank assignment
952///
953/// ## Applications
954/// - **Statistical ranking**: Ranking continuous numerical data
955/// - **Scientific analysis**: Ordered ranking of experimental measurements
956/// - **Financial analysis**: Ranking performance metrics and indicators
957/// - **Data preprocessing**: Establishing ordinality for regression analysis
958///
959/// ## Examples
960/// ```rust,ignore
961/// use minarrow::FloatArray;
962/// use simd_kernels::kernels::window::rank_float;
963///
964/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, f64::NAN]);
965/// let result = rank_float((&arr, 0, arr.len()));
966/// // Output: [3, 2, 1, 0] - NaN excluded, others ranked by value
967/// ```
968#[inline(always)]
969pub fn rank_float<T: Float + Copy>(window: FloatAVT<T>) -> IntegerArray<i32> {
970 let (arr, offset, len) = window;
971 let data = &arr.data[offset..offset + len];
972 let null_mask = if len != arr.data.len() {
973 &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
974 } else {
975 &arr.null_mask
976 };
977 rank_numeric(data, null_mask.as_ref(), |a, b| a.partial_cmp(b).unwrap())
978}
979
980/// Computes standard SQL ROW_NUMBER() ranking for string data with lexicographic ordering.
981///
982/// Assigns sequential rank values based on lexicographic string comparison, implementing
983/// ROW_NUMBER() semantics for textual data. Essential for alphabetical ranking and
984/// string-based analytical operations.
985///
986/// ## Parameters
987/// * `arr` - String array view containing textual values for ranking
988///
989/// ## Returns
990/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
991/// - **Success**: Rank values from 1 to n for valid string elements
992/// - **Error**: KernelError if capacity validation fails
993/// - Zero values for null string elements
994/// - Null mask indicating positions with valid ranks
995///
996/// ## String Ranking Semantics
997/// - **Lexicographic order**: Uses standard string comparison (dictionary order)
998/// - **Case sensitivity**: Comparisons are case-sensitive ("A" < "a")
999/// - **Unicode support**: Proper handling of UTF-8 encoded string data
1000/// - **ROW_NUMBER() behaviour**: Tied strings receive different ranks by position
1001///
1002/// ## Error Conditions
1003/// - **Capacity errors**: Returns KernelError if mask capacity validation fails
1004/// - **Memory allocation**: May fail with insufficient memory for large datasets
1005///
1006/// ## Use Cases
1007/// - **Alphabetical ranking**: Creating alphabetically ordered rankings
1008/// - **Text analysis**: Establishing lexicographic ordinality in textual data
1009/// - **Database operations**: SQL ROW_NUMBER() implementation for string columns
1010/// - **Sorting applications**: Providing ranking information for string sorting
1011///
1012/// ## Examples
1013/// ```rust,ignore
1014/// use minarrow::StringArray;
1015/// use simd_kernels::kernels::window::rank_str;
1016///
1017/// let arr = StringArray::<u32>::from_slice(&["zebra", "apple", "banana"]);
1018/// let result = rank_str((&arr, 0, arr.len())).unwrap();
1019/// // Output: [3, 1, 2] - lexicographic ranking
1020/// ```
1021#[inline(always)]
1022pub fn rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1023 let (array, offset, len) = arr;
1024 let mask = array.null_mask.as_ref();
1025 let _ = confirm_mask_capacity(array.len(), mask)?;
1026
1027 // Gather (local_idx, string) pairs for valid elements in the window
1028 let mut tuples: Vec<(usize, &str)> = (0..len)
1029 .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1030 .map(|i| (i, unsafe { array.get_unchecked(offset + i) }.unwrap_or("")))
1031 .collect();
1032
1033 // Sort by string value
1034 tuples.sort_by(|a, b| a.1.cmp(&b.1));
1035
1036 let mut out = vec64![0i32; len];
1037 let mut out_mask = new_null_mask(len);
1038
1039 // Assign ranks (1-based), using local output indices
1040 for (rank, (i, _)) in tuples.iter().enumerate() {
1041 out[*i] = (rank + 1) as i32;
1042 unsafe { out_mask.set_unchecked(*i, true) };
1043 }
1044
1045 Ok(IntegerArray {
1046 data: out.into(),
1047 null_mask: Some(out_mask),
1048 })
1049}
1050
1051#[inline(always)]
1052fn dense_rank_numeric<T, F, G>(
1053 data: &[T],
1054 mask: Option<&Bitmask>,
1055 mut sort: F,
1056 mut eq: G,
1057) -> Result<IntegerArray<i32>, KernelError>
1058where
1059 T: Copy,
1060 F: FnMut(&T, &T) -> std::cmp::Ordering,
1061 G: FnMut(&T, &T) -> bool,
1062{
1063 let n = data.len();
1064 let _ = confirm_mask_capacity(n, mask)?;
1065 let mut uniqs: Vec<T> = (0..n)
1066 .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(i) }))
1067 .map(|i| data[i])
1068 .collect();
1069
1070 uniqs.sort_by(&mut sort);
1071 uniqs.dedup_by(|a, b| eq(&*a, &*b));
1072
1073 let mut out = prealloc_vec::<i32>(n);
1074 let mut out_mask = Bitmask::new_set_all(n, false);
1075
1076 for i in 0..n {
1077 if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
1078 let rank = uniqs.binary_search_by(|x| sort(x, &data[i])).unwrap() + 1;
1079 out[i] = rank as i32;
1080 unsafe { out_mask.set_unchecked(i, true) };
1081 } else {
1082 out[i] = 0;
1083 }
1084 }
1085
1086 Ok(IntegerArray {
1087 data: out.into(),
1088 null_mask: Some(out_mask),
1089 })
1090}
1091
1092/// Computes SQL DENSE_RANK() ranking for integer data with consecutive rank assignment.
1093///
1094/// Assigns consecutive rank values where tied values receive identical ranks, implementing
1095/// SQL DENSE_RANK() semantics. Unlike standard ranking, eliminates gaps in rank sequence
1096/// when ties occur, providing compact rank numbering for analytical applications.
1097///
1098/// ## Parameters
1099/// * `window` - Integer array view containing values for dense ranking
1100///
1101/// ## Returns
1102/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1103/// - **Success**: Dense rank values with no gaps in sequence
1104/// - **Error**: KernelError if capacity validation fails
1105/// - Zero values for null elements
1106/// - Null mask indicating positions with valid ranks
1107///
1108/// ## Dense Ranking Semantics
1109/// - **DENSE_RANK() behaviour**: Tied values receive same rank, next rank is consecutive
1110/// - **No rank gaps**: Eliminates gaps that occur in standard RANK() function
1111/// - **Unique value counting**: Essentially counts distinct values in sorted order
1112/// - **Ascending order**: Smaller integer values receive lower (better) ranks
1113///
1114/// ## Comparison with RANK()
1115/// - **RANK()**: [1, 2, 2, 4] for values [10, 20, 20, 30]
1116/// - **DENSE_RANK()**: [1, 2, 2, 3] for values [10, 20, 20, 30]
1117///
1118/// ## Use Cases
1119/// - **Analytical queries**: SQL DENSE_RANK() window function implementation
1120/// - **Categorical ranking**: Creating compact categorical orderings
1121/// - **Percentile calculation**: Foundation for percentile computations without gaps
1122/// - **Data binning**: Assigning data points to consecutive bins based on value
1123///
1124/// ## Examples
1125/// ```rust,ignore
1126/// use minarrow::IntegerArray;
1127/// use simd_kernels::kernels::window::dense_rank_int;
1128///
1129/// let arr = IntegerArray::<i32>::from_slice(&[10, 30, 20, 30]);
1130/// let result = dense_rank_int((&arr, 0, arr.len())).unwrap();
1131/// // Output: [1, 3, 2, 3] - dense ranking with tied values
1132/// ```
1133#[inline(always)]
1134pub fn dense_rank_int<T: Ord + Copy>(
1135 window: IntegerAVT<T>,
1136) -> Result<IntegerArray<i32>, KernelError> {
1137 let (arr, offset, len) = window;
1138 let data = &arr.data[offset..offset + len];
1139 let null_mask = if len != arr.data.len() {
1140 &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1141 } else {
1142 &arr.null_mask
1143 };
1144 dense_rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b), |a, b| a == b)
1145}
1146
1147/// Computes SQL DENSE_RANK() ranking for floating-point data with IEEE 754 compliance.
1148///
1149/// Implements dense ranking for floating-point values where tied values receive identical
1150/// consecutive ranks. Handles special floating-point values (NaN, infinity) according
1151/// to IEEE 754 standards while maintaining dense rank sequence properties.
1152///
1153/// ## Parameters
1154/// * `window` - Float array view containing values for dense ranking
1155///
1156/// ## Returns
1157/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1158/// - **Success**: Dense rank values with consecutive numbering
1159/// - **Error**: KernelError if capacity validation fails
1160/// - Zero values for null or NaN elements
1161/// - Null mask indicating positions with valid ranks
1162///
1163/// ## Applications
1164/// - **Scientific ranking**: Dense ranking of experimental measurements
1165/// - **Statistical analysis**: Percentile calculations without rank gaps
1166/// - **Financial modeling**: Dense ranking of performance metrics
1167/// - **Data preprocessing**: Creating ordinal encodings for continuous variables
1168///
1169/// ## Examples
1170/// ```rust,ignore
1171/// use minarrow::FloatArray;
1172/// use simd_kernels::kernels::window::dense_rank_float;
1173///
1174/// let arr = FloatArray::<f64>::from_slice(&[1.5, 3.14, 2.71, 3.14]);
1175/// let result = dense_rank_float((&arr, 0, arr.len())).unwrap();
1176/// // Output: [1, 3, 2, 3] - dense ranking with tied 3.14 values
1177/// ```
1178#[inline(always)]
1179pub fn dense_rank_float<T: Float + Copy>(
1180 window: FloatAVT<T>,
1181) -> Result<IntegerArray<i32>, KernelError> {
1182 let (arr, offset, len) = window;
1183 let data = &arr.data[offset..offset + len];
1184 let null_mask = if len != arr.data.len() {
1185 &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1186 } else {
1187 &arr.null_mask
1188 };
1189 dense_rank_numeric(
1190 data,
1191 null_mask.as_ref(),
1192 |a, b| a.partial_cmp(b).unwrap(),
1193 |a, b| a == b,
1194 )
1195}
1196
1197/// Computes SQL DENSE_RANK() ranking for string data with lexicographic dense ordering.
1198///
1199/// Implements dense ranking for string values using lexicographic comparison, where
1200/// identical strings receive the same rank and subsequent ranks remain consecutive.
1201/// Essential for alphabetical dense ranking and textual categorical analysis.
1202///
1203/// ## Parameters
1204/// * `arr` - String array view containing textual values for dense ranking
1205///
1206/// ## Returns
1207/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1208/// - **Success**: Dense rank values with consecutive sequence
1209/// - **Error**: KernelError if capacity validation fails
1210/// - Zero values for null string elements
1211/// - Null mask indicating positions with valid ranks
1212///
1213/// ## Dense String Ranking
1214/// - **DENSE_RANK() semantics**: Identical strings receive same rank, no rank gaps
1215/// - **Lexicographic ordering**: Standard dictionary-style string comparison
1216/// - **Case sensitivity**: Maintains case-sensitive comparison ("Apple" ≠ "apple")
1217/// - **UTF-8 support**: Proper handling of Unicode string sequences
1218///
1219/// ## Use Cases
1220/// - **Alphabetical dense ranking**: Creating compact alphabetical orderings
1221/// - **Categorical encoding**: Converting string categories to dense integer codes
1222/// - **Text analytics**: Establishing lexicographic ordinality for text processing
1223/// - **Database operations**: SQL DENSE_RANK() for string-valued columns
1224///
1225/// ## Examples
1226/// ```rust,ignore
1227/// use minarrow::StringArray;
1228/// use simd_kernels::kernels::window::dense_rank_str;
1229///
1230/// let arr = StringArray::<u32>::from_slice(&["banana", "apple", "cherry", "apple"]);
1231/// let result = dense_rank_str((&arr, 0, arr.len())).unwrap();
1232/// // Output: [2, 1, 3, 1] - dense ranking with tied "apple" values
1233/// ```
1234#[inline(always)]
1235pub fn dense_rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1236 let (array, offset, len) = arr;
1237 let mask = array.null_mask.as_ref();
1238 let _ = confirm_mask_capacity(array.len(), mask)?;
1239
1240 // Collect all unique valid values in window
1241 let mut vals: Vec<&str> = (0..len)
1242 .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1243 .map(|i| unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1244 .collect();
1245 vals.sort();
1246 vals.dedup();
1247
1248 let mut out = prealloc_vec::<i32>(len);
1249 let mut out_mask = Bitmask::new_set_all(len, false);
1250
1251 for i in 0..len {
1252 let valid = mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) });
1253 if valid {
1254 let rank = vals
1255 .binary_search(&unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1256 .unwrap()
1257 + 1;
1258 out[i] = rank as i32;
1259 unsafe { out_mask.set_unchecked(i, true) };
1260 } else {
1261 out[i] = 0;
1262 }
1263 }
1264
1265 Ok(IntegerArray {
1266 data: out.into(),
1267 null_mask: Some(out_mask),
1268 })
1269}
1270
1271// Lag / Lead / Shift kernels
1272
1273#[inline(always)]
1274fn shift_with_bounds<T: Copy>(
1275 data: &[T],
1276 mask: Option<&Bitmask>,
1277 len: usize,
1278 offset_fn: impl Fn(usize) -> Option<usize>,
1279 default: T,
1280) -> (Vec64<T>, Bitmask) {
1281 let mut out = prealloc_vec::<T>(len);
1282 let mut out_mask = Bitmask::new_set_all(len, false);
1283 for i in 0..len {
1284 if let Some(j) = offset_fn(i) {
1285 out[i] = data[j];
1286 let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
1287 unsafe { out_mask.set_unchecked(i, is_valid) };
1288 } else {
1289 out[i] = default;
1290 }
1291 }
1292 (out, out_mask)
1293}
1294
1295#[inline(always)]
1296fn shift_str_with_bounds<T: Integer>(
1297 arr: StringAVT<T>,
1298 offset_fn: impl Fn(usize) -> Option<usize>,
1299) -> Result<StringArray<T>, KernelError> {
1300 let (array, offset, len) = arr;
1301 let src_mask = array.null_mask.as_ref();
1302 let _ = confirm_mask_capacity(array.len(), src_mask)?;
1303
1304 // Determine offsets and total bytes required
1305 let mut offsets = Vec64::<T>::with_capacity(len + 1);
1306 unsafe {
1307 offsets.set_len(len + 1);
1308 }
1309 offsets[0] = T::zero();
1310
1311 let mut total_bytes = 0;
1312 let mut string_lengths = vec![0usize; len];
1313
1314 for i in 0..len {
1315 let byte_len = if let Some(j) = offset_fn(i) {
1316 let src_idx = offset + j;
1317 let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1318 if valid {
1319 unsafe { array.get_unchecked(src_idx).unwrap_or("").len() }
1320 } else {
1321 0
1322 }
1323 } else {
1324 0
1325 };
1326 total_bytes += byte_len;
1327 string_lengths[i] = byte_len;
1328 offsets[i + 1] = T::from_usize(total_bytes);
1329 }
1330
1331 // Allocate data buffer
1332 let mut data = Vec64::<u8>::with_capacity(total_bytes);
1333 let mut out_mask = Bitmask::new_set_all(len, false);
1334
1335 // Write string content
1336 for i in 0..len {
1337 if let Some(j) = offset_fn(i) {
1338 let src_idx = offset + j;
1339 let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1340 if valid {
1341 let s = unsafe { array.get_unchecked(src_idx).unwrap_or("") };
1342 data.extend_from_slice(s.as_bytes());
1343 unsafe { out_mask.set_unchecked(i, true) };
1344 continue;
1345 }
1346 }
1347 // Not valid or OOB write nothing
1348 }
1349
1350 Ok(StringArray {
1351 offsets: offsets.into(),
1352 data: data.into(),
1353 null_mask: Some(out_mask),
1354 })
1355}
1356
1357// Integer
1358
1359/// Accesses values from previous positions in integer arrays with configurable offset.
1360///
1361/// Implements SQL LAG() window function semantics, retrieving values from earlier positions
1362/// in the array sequence. Essential for time series analysis, trend detection, and
1363/// comparative analytics requiring access to historical data points.
1364///
1365/// ## Parameters
1366/// * `window` - Integer array view containing sequential data for lag access
1367/// * `n` - Lag offset specifying how many positions to look backward
1368///
1369/// ## Returns
1370/// Returns an `IntegerArray<T>` containing:
1371/// - Values from n positions earlier in the sequence
1372/// - Default values for positions where lag source is unavailable
1373/// - Null mask indicating validity of lagged values
1374///
1375/// ## Examples
1376/// ```rust,ignore
1377/// use minarrow::IntegerArray;
1378/// use simd_kernels::kernels::window::lag_int;
1379///
1380/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1381/// let result = lag_int((&arr, 0, arr.len()), 1);
1382/// ```
1383#[inline]
1384pub fn lag_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1385 let (arr, offset, len) = window;
1386 let data_window = &arr.data[offset..offset + len];
1387 let mask_window = if len != arr.data.len() {
1388 arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1389 } else {
1390 arr.null_mask.clone()
1391 };
1392 let (data, null_mask) = shift_with_bounds(
1393 data_window,
1394 mask_window.as_ref(),
1395 len,
1396 |i| if i >= n { Some(i - n) } else { None },
1397 T::default(),
1398 );
1399 IntegerArray {
1400 data: data.into(),
1401 null_mask: Some(null_mask),
1402 }
1403}
1404
1405/// Accesses values from future positions in integer arrays with configurable offset.
1406///
1407/// Implements SQL LEAD() window function semantics, retrieving values from later positions
1408/// in the array sequence. Essential for predictive analytics, forward-looking comparisons,
1409/// and temporal analysis requiring access to future data points.
1410///
1411/// ## Parameters
1412/// * `window` - Integer array view containing sequential data for lead access
1413/// * `n` - Lead offset specifying how many positions to look forward
1414///
1415/// ## Returns
1416/// Returns an `IntegerArray<T>` containing:
1417/// - Values from n positions later in the sequence
1418/// - Default values for positions where lead source is unavailable
1419/// - Null mask indicating validity of lead values
1420///
1421/// ## Examples
1422/// ```rust,ignore
1423/// use minarrow::IntegerArray;
1424/// use simd_kernels::kernels::window::lead_int;
1425///
1426/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1427/// let result = lead_int((&arr, 0, arr.len()), 2);
1428/// ```
1429#[inline]
1430pub fn lead_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1431 let (arr, offset, len) = window;
1432 let data_window = &arr.data[offset..offset + len];
1433 let mask_window = if len != arr.data.len() {
1434 arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1435 } else {
1436 arr.null_mask.clone()
1437 };
1438 let (data, null_mask) = shift_with_bounds(
1439 data_window,
1440 mask_window.as_ref(),
1441 len,
1442 |i| if i + n < len { Some(i + n) } else { None },
1443 T::default(),
1444 );
1445 IntegerArray {
1446 data: data.into(),
1447 null_mask: Some(null_mask),
1448 }
1449}
1450
1451/// Accesses values from previous positions in floating-point arrays with IEEE 754 compliance.
1452///
1453/// Implements SQL LAG() function for floating-point data, retrieving values from earlier
1454/// positions while maintaining IEEE 754 semantics for special values (NaN, infinity).
1455/// Critical for time series analysis and numerical sequence processing.
1456///
1457/// ## Parameters
1458/// * `window` - Float array view containing sequential floating-point data
1459/// * `n` - Lag offset specifying backward position distance
1460///
1461/// ## Returns
1462/// Returns a `FloatArray<T>` containing:
1463/// - Floating-point values from n positions earlier
1464/// - Zero values for positions with insufficient history
1465/// - Null mask reflecting lag validity and special value handling
1466///
1467/// ## Examples
1468/// ```rust,ignore
1469/// use minarrow::FloatArray;
1470/// use simd_kernels::kernels::window::lag_float;
1471///
1472/// let arr = FloatArray::<f64>::from_slice(&[1.0, 2.5, 3.14, 4.7]);
1473/// let result = lag_float((&arr, 0, arr.len()), 1);
1474/// ```
1475#[inline]
1476pub fn lag_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1477 let (arr, offset, len) = window;
1478 let data_window = &arr.data[offset..offset + len];
1479 let mask_window = if len != arr.data.len() {
1480 arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1481 } else {
1482 arr.null_mask.clone()
1483 };
1484 let (data, null_mask) = shift_with_bounds(
1485 data_window,
1486 mask_window.as_ref(),
1487 len,
1488 |i| if i >= n { Some(i - n) } else { None },
1489 T::zero(),
1490 );
1491 FloatArray {
1492 data: data.into(),
1493 null_mask: Some(null_mask),
1494 }
1495}
1496
1497/// Accesses values from future positions in floating-point arrays with IEEE 754 compliance.
1498///
1499/// Implements SQL LEAD() function for floating-point data, retrieving values from later
1500/// positions while preserving IEEE 754 semantics. Essential for forward-looking analysis
1501/// and predictive modeling with continuous numerical data.
1502///
1503/// ## Parameters
1504/// * `window` - Float array view containing sequential floating-point data
1505/// * `n` - Lead offset specifying forward position distance
1506///
1507/// ## Returns
1508/// Returns a `FloatArray<T>` containing:
1509/// - Floating-point values from n positions later
1510/// - Zero values for positions beyond available future
1511/// - Null mask indicating lead validity and special value propagation
1512///
1513/// ## Use Cases
1514/// - **Predictive analytics**: Accessing future values for comparison and modeling
1515/// - **Signal analysis**: Forward-looking operations in digital signal processing
1516/// - **Financial modeling**: Computing forward returns and future value analysis
1517/// - **Scientific computing**: Implementing forward difference schemes
1518///
1519/// ## Examples
1520/// ```rust,ignore
1521/// use minarrow::FloatArray;
1522/// use simd_kernels::kernels::window::lead_float;
1523///
1524/// let arr = FloatArray::<f32>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1525/// let result = lead_float((&arr, 0, arr.len()), 2);
1526/// // Output: [3.3, 4.4, 0.0, 0.0] - lead by 2 positions
1527/// ```
1528#[inline]
1529pub fn lead_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1530 let (arr, offset, len) = window;
1531 let data_window = &arr.data[offset..offset + len];
1532 let mask_window = if len != arr.data.len() {
1533 arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1534 } else {
1535 arr.null_mask.clone()
1536 };
1537 let (data, null_mask) = shift_with_bounds(
1538 data_window,
1539 mask_window.as_ref(),
1540 len,
1541 |i| if i + n < len { Some(i + n) } else { None },
1542 T::zero(),
1543 );
1544 FloatArray {
1545 data: data.into(),
1546 null_mask: Some(null_mask),
1547 }
1548}
1549
1550// String
1551
1552/// Accesses string values from previous positions with UTF-8 string handling.
1553///
1554/// Implements SQL LAG() function for string data, retrieving textual values from earlier
1555/// positions in the array sequence. Essential for textual analysis, sequential string
1556/// processing, and comparative text analytics.
1557///
1558/// ## Parameters
1559/// * `arr` - String array view containing sequential textual data
1560/// * `n` - Lag offset specifying backward position distance
1561///
1562/// ## Returns
1563/// Returns `Result<StringArray<T>, KernelError>` containing:
1564/// - **Success**: String values from n positions earlier
1565/// - **Error**: KernelError if string processing fails
1566/// - Empty strings for positions with insufficient history
1567/// - Null mask indicating lag validity and source availability
1568///
1569/// ## String Lag Semantics
1570/// - **UTF-8 preservation**: Maintains proper UTF-8 encoding throughout operation
1571/// - **Null propagation**: Null strings in source positions result in null outputs
1572/// - **Memory management**: Efficient string copying and allocation strategies
1573/// - **Boundary handling**: Positions without history receive empty string defaults
1574///
1575/// ## Applications
1576/// - **Text analysis**: Comparing current text with previous entries
1577/// - **Sequential processing**: Analysing patterns in ordered textual data
1578/// - **Log analysis**: Accessing previous log entries for context
1579/// - **Natural language processing**: Context-aware text processing with history
1580///
1581/// ## Examples
1582/// ```rust,ignore
1583/// use minarrow::StringArray;
1584/// use simd_kernels::kernels::window::lag_str;
1585///
1586/// let arr = StringArray::<u32>::from_slice(&["first", "second", "third"]);
1587/// let result = lag_str((&arr, 0, arr.len()), 1).unwrap();
1588/// // Output: ["", "first", "second"] - strings lagged by 1 position
1589/// ```
1590#[inline]
1591pub fn lag_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1592 shift_str_with_bounds(arr, |i| if i >= n { Some(i - n) } else { None })
1593}
1594
1595/// Accesses string values from future positions with efficient UTF-8 processing.
1596///
1597/// Implements SQL LEAD() function for string data, retrieving textual values from later
1598/// positions in the array sequence. Critical for forward-looking text analysis and
1599/// sequential string pattern recognition.
1600///
1601/// ## Parameters
1602/// * `arr` - String array view containing sequential textual data
1603/// * `n` - Lead offset specifying forward position distance
1604///
1605/// ## Returns
1606/// Returns `Result<StringArray<T>, KernelError>` containing:
1607/// - **Success**: String values from n positions later
1608/// - **Error**: KernelError if string processing encounters issues
1609/// - Empty strings for positions beyond available future
1610/// - Null mask indicating lead validity and source availability
1611///
1612/// ## Examples
1613/// ```rust,ignore
1614/// use minarrow::StringArray;
1615/// use simd_kernels::kernels::window::lead_str;
1616///
1617/// let arr = StringArray::<u32>::from_slice(&["alpha", "beta", "gamma"]);
1618/// let result = lead_str((&arr, 0, arr.len()), 1).unwrap();
1619/// ```
1620#[inline]
1621pub fn lead_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1622 let (_array, _offset, len) = arr;
1623 shift_str_with_bounds(arr, |i| if i + n < len { Some(i + n) } else { None })
1624}
1625
1626// Shift variants
1627/// Shifts integer array elements by specified offset with bidirectional support.
1628///
1629/// Provides unified interface for both LAG and LEAD operations through signed offset
1630/// parameter. Positive offsets implement LEAD semantics (forward shift), negative
1631/// offsets implement LAG semantics (backward shift), enabling flexible positional access.
1632///
1633/// ## Parameters
1634/// * `window` - Integer array view containing data for shifting
1635/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1636///
1637/// ## Returns
1638/// Returns an `IntegerArray<T>` containing:
1639/// - Shifted integer values according to offset direction
1640/// - Default values for positions beyond available data
1641/// - Null mask reflecting validity of shifted positions
1642///
1643/// ## Shift Semantics
1644/// - **Positive offset**: LEAD operation (shift left, access future values)
1645/// - **Negative offset**: LAG operation (shift right, access past values)
1646/// - **Zero offset**: Identity operation (returns original array)
1647/// - **Boundary handling**: Out-of-bounds positions receive default values
1648///
1649/// ## Applications
1650/// - **Time series analysis**: Flexible temporal shifting for comparison operations
1651/// - **Sequence processing**: Bidirectional access in ordered integer sequences
1652/// - **Algorithm implementation**: Building blocks for complex windowing operations
1653/// - **Data transformation**: Positional transformations in numerical datasets
1654///
1655/// ## Examples
1656/// ```rust,ignore
1657/// use minarrow::IntegerArray;
1658/// use simd_kernels::kernels::window::shift_int;
1659///
1660/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1661/// let lag = shift_int((&arr, 0, arr.len()), -1); // LAG by 1
1662/// let lead = shift_int((&arr, 0, arr.len()), 2); // LEAD by 2
1663/// // lag: [0, 1, 2, 3] - previous values
1664/// // lead: [3, 4, 0, 0] - future values
1665/// ```
1666#[inline(always)]
1667pub fn shift_int<T: Copy + Default>(window: IntegerAVT<T>, offset: isize) -> IntegerArray<T> {
1668 let (arr, win_offset, win_len) = window;
1669 if offset == 0 {
1670 return IntegerArray {
1671 data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1672 null_mask: if win_len != arr.data.len() {
1673 arr.null_mask
1674 .as_ref()
1675 .map(|m| m.slice_clone(win_offset, win_len))
1676 } else {
1677 arr.null_mask.clone()
1678 },
1679 };
1680 } else if offset > 0 {
1681 lead_int((arr, win_offset, win_len), offset as usize)
1682 } else {
1683 lag_int((arr, win_offset, win_len), offset.unsigned_abs())
1684 }
1685}
1686
1687/// Shifts floating-point array elements with IEEE 754 compliance and bidirectional support.
1688///
1689/// Unified shifting interface for floating-point data supporting both LAG and LEAD semantics
1690/// through signed offset parameter. Maintains IEEE 754 standards for special value handling
1691/// while providing efficient bidirectional positional access.
1692///
1693/// ## Parameters
1694/// * `window` - Float array view containing data for shifting
1695/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1696///
1697/// ## Returns
1698/// Returns a `FloatArray<T>` containing:
1699/// - Shifted floating-point values preserving IEEE 754 semantics
1700/// - Zero values for positions beyond data boundaries
1701/// - Null mask indicating validity of shifted positions
1702///
1703/// ## Examples
1704/// ```rust,ignore
1705/// use minarrow::FloatArray;
1706/// use simd_kernels::kernels::window::shift_float;
1707///
1708/// let arr = FloatArray::<f64>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1709/// let backward = shift_float((&arr, 0, arr.len()), -2); // LAG by 2
1710/// let forward = shift_float((&arr, 0, arr.len()), 1); // LEAD by 1
1711/// ```
1712#[inline(always)]
1713pub fn shift_float<T: Copy + num_traits::Zero>(
1714 window: FloatAVT<T>,
1715 offset: isize,
1716) -> FloatArray<T> {
1717 let (arr, win_offset, win_len) = window;
1718 if offset == 0 {
1719 return FloatArray {
1720 data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1721 null_mask: if win_len != arr.data.len() {
1722 arr.null_mask
1723 .as_ref()
1724 .map(|m| m.slice_clone(win_offset, win_len))
1725 } else {
1726 arr.null_mask.clone()
1727 },
1728 };
1729 } else if offset > 0 {
1730 lead_float((arr, win_offset, win_len), offset as usize)
1731 } else {
1732 lag_float((arr, win_offset, win_len), offset.unsigned_abs())
1733 }
1734}
1735
1736/// Shifts string array elements with UTF-8 integrity and bidirectional offset support.
1737///
1738/// String shifting function supporting both LAG and LEAD operations through
1739/// signed offset parameter. Maintains UTF-8 encoding integrity while providing flexible
1740/// positional access for textual sequence analysis.
1741///
1742/// ## Parameters
1743/// * `arr` - String array view containing textual data for shifting
1744/// * `shift_offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1745///
1746/// ## Returns
1747/// Returns `Result<StringArray<T>, KernelError>` containing:
1748/// - **Success**: Shifted string values maintaining UTF-8 integrity
1749/// - **Error**: KernelError if string processing encounters issues
1750/// - Empty strings for positions beyond data boundaries
1751/// - Null mask reflecting validity of shifted string positions
1752///
1753/// ## Shift Semantics
1754/// - **Positive offset**: LEAD operation accessing future string values
1755/// - **Negative offset**: LAG operation accessing historical string values
1756/// - **Zero offset**: Identity operation (returns cloned array slice)
1757/// - **Boundary conditions**: Out-of-range positions produce empty strings
1758///
1759/// ## Examples
1760/// ```rust,ignore
1761/// use minarrow::StringArray;
1762/// use simd_kernels::kernels::window::shift_str;
1763///
1764/// let arr = StringArray::<u32>::from_slice(&["one", "two", "three"]);
1765/// let back = shift_str((&arr, 0, arr.len()), -1).unwrap(); // LAG
1766/// let forward = shift_str((&arr, 0, arr.len()), 1).unwrap(); // LEAD
1767/// // back: ["", "one", "two"]
1768/// // forward: ["two", "three", ""]
1769/// ```
1770#[inline(always)]
1771pub fn shift_str<T: Integer>(
1772 arr: StringAVT<T>,
1773 shift_offset: isize,
1774) -> Result<StringArray<T>, KernelError> {
1775 if shift_offset == 0 {
1776 // Return this slice's window as a cloned StringArray
1777 let (array, off, len) = arr;
1778 Ok(array.slice_clone(off, len))
1779 } else if shift_offset > 0 {
1780 lead_str(arr, shift_offset as usize)
1781 } else {
1782 lag_str(arr, shift_offset.unsigned_abs())
1783 }
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788 use minarrow::structs::variants::float::FloatArray;
1789 use minarrow::structs::variants::integer::IntegerArray;
1790 use minarrow::structs::variants::string::StringArray;
1791 use minarrow::{Bitmask, BooleanArray};
1792
1793 use super::*;
1794
1795 // ─────────────────────────── Helpers ───────────────────────────
1796
1797 /// Build a `Bitmask` from booleans.
1798 fn bm(bits: &[bool]) -> Bitmask {
1799 let mut m = Bitmask::new_set_all(bits.len(), false);
1800 for (i, &b) in bits.iter().enumerate() {
1801 m.set(i, b);
1802 }
1803 m
1804 }
1805
1806 /// Simple equality for `IntegerArray<T>`
1807 fn expect_int<T: PartialEq + std::fmt::Debug + Clone>(
1808 arr: &IntegerArray<T>,
1809 values: &[T],
1810 valid: &[bool],
1811 ) {
1812 assert_eq!(arr.data.as_slice(), values);
1813 let mask = arr.null_mask.as_ref().expect("mask missing");
1814 for (i, &v) in valid.iter().enumerate() {
1815 assert_eq!(mask.get(i), v, "mask bit {}", i);
1816 }
1817 }
1818
1819 /// Simple equality for `FloatArray<T>`
1820 fn expect_float<T: num_traits::Float + std::fmt::Debug>(
1821 arr: &FloatArray<T>,
1822 values: &[T],
1823 valid: &[bool],
1824 eps: T,
1825 ) {
1826 let data = arr.data.as_slice();
1827 assert_eq!(data.len(), values.len());
1828 for (a, b) in data.iter().zip(values.iter()) {
1829 assert!((*a - *b).abs() <= eps, "value mismatch {:?} vs {:?}", a, b);
1830 }
1831 let mask = arr.null_mask.as_ref().expect("mask missing");
1832 for (i, &v) in valid.iter().enumerate() {
1833 assert_eq!(mask.get(i), v);
1834 }
1835 }
1836
1837 // ───────────────────────── Rolling kernels ─────────────────────────
1838
1839 #[test]
1840 fn test_rolling_sum_int_basic() {
1841 let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1842 let res = rolling_sum_int((&arr, 0, arr.len()), 3);
1843 expect_int(&res, &[0, 0, 6, 9, 12], &[false, false, true, true, true]);
1844 }
1845
1846 #[test]
1847 fn test_rolling_sum_int_masked() {
1848 let mut arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1849 arr.null_mask = Some(bm(&[true, false, true, true]));
1850 let res = rolling_sum_int((&arr, 0, arr.len()), 2);
1851 expect_int(
1852 &res,
1853 &[0, 0, 3, 7],
1854 &[false, false, false, true], // window valid only when no nulls in window
1855 );
1856 }
1857
1858 #[test]
1859 fn test_rolling_sum_float() {
1860 let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1861 let res = rolling_sum_float((&arr, 0, arr.len()), 2);
1862 expect_float(&res, &[0.0, 0.0, 5.0], &[false, false, true], 1e-6f32);
1863 }
1864
1865 #[test]
1866 fn test_rolling_sum_bool() {
1867 let bools = BooleanArray::from_slice(&[true, true, false, true]);
1868 let res = rolling_sum_bool((&bools, 0, bools.len()), 2); // counts trues over window
1869 expect_int(&res, &[0, 0, 1, 1], &[false, false, true, true]);
1870 }
1871
1872 #[test]
1873 fn test_rolling_min_max_mean_count() {
1874 let arr = IntegerArray::<i32>::from_slice(&[3, 1, 4, 1, 5]);
1875 // min
1876 let rmin = rolling_min_int((&arr, 0, arr.len()), 2);
1877 expect_int(&rmin, &[0, 0, 1, 1, 1], &[false, false, true, true, true]);
1878
1879 // max
1880 let rmax = rolling_max_int((&arr, 0, arr.len()), 3);
1881 expect_int(&rmax, &[0, 0, 4, 4, 5], &[false, false, true, true, true]);
1882
1883 // mean
1884 let rmean = rolling_mean_int((&arr, 0, arr.len()), 2);
1885 expect_float(
1886 &rmean,
1887 &[0.0, 0.0, 2.5, 2.5, 3.0],
1888 &[false, false, true, true, true],
1889 1e-12,
1890 );
1891
1892 // count
1893 let cnt = rolling_count((0, 5), 3);
1894 expect_int(&cnt, &[0, 0, 3, 3, 3], &[false, false, true, true, true]);
1895 }
1896
1897 // ───────────────────────── Rank / Dense-rank ─────────────────────────
1898
1899 #[test]
1900 fn test_rank_int_basic() {
1901 let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20]);
1902 let res = rank_int((&arr, 0, arr.len()));
1903 expect_int(&res, &[3, 1, 2], &[true, true, true]);
1904 }
1905
1906 #[test]
1907 fn test_rank_float_with_nulls() {
1908 let mut arr = FloatArray::<f64>::from_slice(&[2.0, 1.0, 3.0]);
1909 arr.null_mask = Some(bm(&[true, false, true]));
1910 let res = rank_float((&arr, 0, arr.len()));
1911 expect_int(&res, &[2, 0, 3], &[true, false, true]);
1912 }
1913
1914 #[test]
1915 fn test_dense_rank_str_duplicates() {
1916 let arr = StringArray::<u32>::from_slice(&["b", "a", "b", "c"]);
1917 let res = dense_rank_str((&arr, 0, arr.len())).unwrap();
1918 expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1919 }
1920
1921 #[test]
1922 fn test_dense_rank_str_duplicates_chunk() {
1923 // Windowed over ["x", "b", "a", "b", "c", "y"]
1924 let arr = StringArray::<u32>::from_slice(&["x", "b", "a", "b", "c", "y"]);
1925 let res = dense_rank_str((&arr, 1, 4)).unwrap(); // window is "b", "a", "b", "c"
1926 expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1927 }
1928
1929 // ───────────────────────── Lag / Lead / Shift ─────────────────────────
1930
1931 #[test]
1932 fn test_lag_lead_int() {
1933 let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1934 let lag1 = lag_int((&arr, 0, arr.len()), 1);
1935 expect_int(&lag1, &[0, 10, 20, 30], &[false, true, true, true]);
1936
1937 let lead2 = lead_int((&arr, 0, arr.len()), 2);
1938 expect_int(&lead2, &[30, 40, 0, 0], &[true, true, false, false]);
1939 }
1940
1941 #[test]
1942 fn test_shift_float_positive_negative_zero() {
1943 let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1944 let s0 = shift_float((&arr, 0, arr.len()), 0);
1945 assert_eq!(s0.data, arr.data); // exact copy
1946
1947 let s1 = shift_float((&arr, 0, arr.len()), 1);
1948 expect_float(&s1, &[2.0, 3.0, 0.0], &[true, true, false], 1e-6f32);
1949
1950 let s_neg = shift_float((&arr, 0, arr.len()), -1);
1951 expect_float(&s_neg, &[0.0, 1.0, 2.0], &[false, true, true], 1e-6f32);
1952 }
1953
1954 #[test]
1955 fn test_lag_lead_str() {
1956 let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
1957 let l1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
1958 assert_eq!(l1.get(0), None);
1959 assert_eq!(l1.get(1), Some("a"));
1960 assert_eq!(l1.get(2), Some("b"));
1961
1962 let d1 = lead_str((&arr, 0, arr.len()), 1).unwrap();
1963 assert_eq!(d1.get(0), Some("b"));
1964 assert_eq!(d1.get(1), Some("c"));
1965 assert_eq!(d1.get(2), None);
1966 }
1967
1968 #[test]
1969 fn test_lag_lead_str_chunk() {
1970 // Window is ["x", "a", "b", "c", "y"], test on chunk "a", "b", "c"
1971 let arr = StringArray::<u32>::from_slice(&["x", "a", "b", "c", "y"]);
1972 let l1 = lag_str((&arr, 1, 3), 1).unwrap();
1973 assert_eq!(l1.get(0), None);
1974 assert_eq!(l1.get(1), Some("a"));
1975 assert_eq!(l1.get(2), Some("b"));
1976
1977 let d1 = lead_str((&arr, 1, 3), 1).unwrap();
1978 assert_eq!(d1.get(0), Some("b"));
1979 assert_eq!(d1.get(1), Some("c"));
1980 assert_eq!(d1.get(2), None);
1981 }
1982
1983 #[test]
1984 fn test_rolling_sum_int_edge_windows() {
1985 let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1986
1987 // window = 0 → all zeros + mask all false
1988 let r0 = rolling_sum_int((&arr, 0, arr.len()), 0);
1989 assert_eq!(r0.data.as_slice(), &[0, 0, 0, 0, 0]);
1990 assert_eq!(r0.null_mask.as_ref().unwrap().all_unset(), true);
1991
1992 // window = 1 → identity
1993 let r1 = rolling_sum_int((&arr, 0, arr.len()), 1);
1994 assert_eq!(r1.data.as_slice(), &[1, 2, 3, 4, 5]);
1995 assert!(r1.null_mask.as_ref().unwrap().all_set());
1996
1997 // window > len → all zero + all false
1998 let r_large = rolling_sum_int((&arr, 0, arr.len()), 10);
1999 assert_eq!(r_large.data.as_slice(), &[0, 0, 0, 0, 0]);
2000 assert_eq!(r_large.null_mask.as_ref().unwrap().all_unset(), true);
2001 }
2002
2003 #[test]
2004 fn test_rolling_sum_float_masked_nulls_propagate() {
2005 let mut arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0, 4.0]);
2006 // null in the middle
2007 arr.null_mask = Some(bm(&[true, true, false, true]));
2008 let r = rolling_sum_float((&arr, 0, arr.len()), 2);
2009 // i=0: <full-window, 0.0, false>
2010 // i=1: first full-window → cleared → 0.0, false
2011 // i=2: window contains null → mask=false, but value = 2.0
2012 // i=3: window contains null → mask=false, but value = 4.0
2013 expect_float(
2014 &r,
2015 &[0.0, 0.0, 2.0, 4.0],
2016 &[false, false, false, false],
2017 1e-6,
2018 );
2019 }
2020
2021 #[test]
2022 fn test_rolling_sum_bool_with_nulls() {
2023 let mut b = BooleanArray::from_slice(&[true, false, true, true]);
2024 b.null_mask = Some(bm(&[true, false, true, true]));
2025 let r = rolling_sum_bool((&b, 0, b.len()), 2);
2026 // windows [t,f], [f,t], [t,t] → only last window is all non-null
2027 expect_int(&r, &[0, 0, 1, 2], &[false, false, false, true]);
2028 }
2029
2030 #[test]
2031 fn test_lag_str_null_propagation() {
2032 let mut arr = StringArray::<u32>::from_slice(&["x", "y", "z"]);
2033 arr.null_mask = Some(bm(&[true, false, true])); // y is null
2034 let lag1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2035 assert_eq!(lag1.get(0), None); // no source → null
2036 assert_eq!(lag1.get(1), Some("x"));
2037 assert_eq!(lag1.get(2), None); // source was null
2038 let m = lag1.null_mask.unwrap();
2039 assert_eq!(m.get(0), false);
2040 assert_eq!(m.get(1), true);
2041 assert_eq!(m.get(2), false);
2042 }
2043
2044 #[test]
2045 fn test_lag_str_null_propagation_chunk() {
2046 // Window ["w", "x", "y", "z", "q"], test on chunk "x", "y", "z"
2047 let mut arr = StringArray::<u32>::from_slice(&["w", "x", "y", "z", "q"]);
2048 arr.null_mask = Some(bm(&[true, true, false, true, true]));
2049 let lag1 = lag_str((&arr, 1, 3), 1).unwrap();
2050 assert_eq!(lag1.get(0), None); // "x", index 0 in chunk, no source
2051 assert_eq!(lag1.get(1), Some("x")); // "y", index 1 pulls "x" (valid)
2052 assert_eq!(lag1.get(2), None); // "z", index 2 pulls "y" (invalid)
2053 let m = lag1.null_mask.unwrap();
2054 assert_eq!(m.get(0), false);
2055 assert_eq!(m.get(1), true);
2056 assert_eq!(m.get(2), false);
2057 }
2058
2059 #[test]
2060 fn test_shift_str_large_offset() {
2061 let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2062 let shifted = shift_str((&arr, 0, arr.len()), 10).unwrap(); // > len
2063 assert_eq!(shifted.len(), 3);
2064 for i in 0..3 {
2065 assert_eq!(shifted.get(i), None);
2066 assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2067 }
2068 }
2069
2070 #[test]
2071 fn test_shift_str_large_offset_chunk() {
2072 // Window ["w", "a", "b", "c", "x"]
2073 let arr = StringArray::<u32>::from_slice(&["w", "a", "b", "c", "x"]);
2074 let shifted = shift_str((&arr, 1, 3), 10).unwrap(); // window is "a","b","c"
2075 assert_eq!(shifted.len(), 3);
2076 for i in 0..3 {
2077 assert_eq!(shifted.get(i), None);
2078 assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2079 }
2080 }
2081
2082 #[test]
2083 fn test_rank_str_ties_and_nulls() {
2084 let mut arr = StringArray::<u32>::from_slice(&["a", "b", "a", "c"]);
2085 arr.null_mask = Some(bm(&[true, true, false, true]));
2086 let r = rank_str((&arr, 0, arr.len())).unwrap();
2087 // valid positions: 0="a"(rank1),1="b"(rank3),2=null,3="c"(rank4)
2088 expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2089 }
2090
2091 #[test]
2092 fn test_rank_str_ties_and_nulls_chunk() {
2093 // Window ["w", "a", "b", "a", "c"]
2094 let mut arr = StringArray::<u32>::from_slice(&["w", "a", "b", "a", "c"]);
2095 arr.null_mask = Some(bm(&[true, true, true, false, true]));
2096 let r = rank_str((&arr, 1, 4)).unwrap(); // "a","b","a","c"
2097 expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2098 }
2099}