formualizer_eval/
window_ctx.rs

1use crate::traits::{ArgumentHandle, FunctionContext};
2use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
3
4#[derive(Copy, Clone, Debug, Eq, PartialEq)]
5pub enum WindowAxis {
6    Rows,
7    Cols,
8}
9
10#[derive(Copy, Clone, Debug)]
11pub struct WindowSpec {
12    pub width: usize,
13    pub step: usize,
14    pub axis: WindowAxis,
15    pub align_left: bool,
16    pub padding: PaddingPolicy,
17}
18
19impl Default for WindowSpec {
20    fn default() -> Self {
21        Self {
22            width: 1,
23            step: 1,
24            axis: WindowAxis::Rows,
25            align_left: true,
26            padding: PaddingPolicy::None,
27        }
28    }
29}
30
31#[derive(Copy, Clone, Debug, Eq, PartialEq)]
32pub enum PaddingPolicy {
33    None,
34    Empty,
35    EdgeExtend,
36}
37
38/// Window evaluation context passed to windowed functions.
39/// This is intentionally minimal; functions may downcast via `as_any()` to
40/// access a concrete implementation with more helpers.
41/// Simple window context that wraps raw argument handles and the function context.
42pub struct SimpleWindowCtx<'a, 'b> {
43    pub args: &'a [ArgumentHandle<'a, 'b>],
44    pub fctx: &'a dyn FunctionContext,
45    pub spec: WindowSpec,
46}
47
48impl<'a, 'b> SimpleWindowCtx<'a, 'b> {
49    pub fn new(
50        args: &'a [ArgumentHandle<'a, 'b>],
51        fctx: &'a dyn FunctionContext,
52        spec: WindowSpec,
53    ) -> Self {
54        Self { args, fctx, spec }
55    }
56
57    pub fn spec(&self) -> WindowSpec {
58        self.spec
59    }
60
61    /// Iterate over aligned windows across all arguments in row-major order.
62    /// For now, only supports width == 1 (single-cell windows) with axis Rows/Cols and step >= 1.
63    /// The callback receives a slice of window cells (one per argument) at each position.
64    pub fn for_each_window(
65        &mut self,
66        mut f: impl FnMut(&[LiteralValue]) -> Result<(), ExcelError>,
67    ) -> Result<(), ExcelError> {
68        if self.spec.width != 1 {
69            return Err(ExcelError::new(ExcelErrorKind::NImpl)
70                .with_message("window width>1 not yet supported"));
71        }
72        // First pass: determine maximum dimensions from any non-empty, non-1x1 range arg.
73        // For Excel compatibility, we normalize all ranges to the maximum dimensions found,
74        // padding shorter ranges with Empty values.
75        let mut max_dims: Option<(usize, usize)> = None;
76        let mut saw_empty = false;
77        let mut range_dims: Vec<Option<(usize, usize)>> = Vec::with_capacity(self.args.len());
78
79        for arg in self.args.iter() {
80            if let Ok(view) = arg.range_view() {
81                let d = view.dims();
82                match d {
83                    (0, 0) => {
84                        saw_empty = true;
85                        range_dims.push(None);
86                    }
87                    (1, 1) => {
88                        // scalar-like; will be broadcast
89                        range_dims.push(Some((1, 1)));
90                    }
91                    other => {
92                        // Track max dimensions across all non-scalar ranges
93                        if let Some((max_r, max_c)) = max_dims {
94                            // Check if dimensions are compatible (same number of columns for column vectors, etc.)
95                            if (max_c == 1 && other.1 == 1)
96                                || (max_r == 1 && other.0 == 1)
97                                || (max_c == other.1 && max_r == other.0)
98                            {
99                                // Compatible dimensions - use the maximum
100                                max_dims = Some((max_r.max(other.0), max_c.max(other.1)));
101                            } else if max_c == other.1 {
102                                // Same width, different height - use max height
103                                max_dims = Some((max_r.max(other.0), max_c));
104                            } else if max_r == other.0 {
105                                // Same height, different width - use max width
106                                max_dims = Some((max_r, max_c.max(other.1)));
107                            } else {
108                                // Incompatible dimensions - this is an actual error
109                                return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
110                                    format!(
111                                        "incompatible range dimensions: {},{} vs {},{}",
112                                        max_r, max_c, other.0, other.1
113                                    ),
114                                ));
115                            }
116                        } else {
117                            max_dims = Some(other);
118                        }
119                        range_dims.push(Some(other));
120                    }
121                }
122            } else {
123                // Scalar argument
124                range_dims.push(None);
125            }
126        }
127
128        let total = if let Some((r, c)) = max_dims {
129            r * c
130        } else if saw_empty {
131            0
132        } else {
133            1
134        };
135        // Build iterators for each argument with broadcasting and padding
136        let mut iters: Vec<Box<dyn Iterator<Item = LiteralValue>>> =
137            Vec::with_capacity(self.args.len());
138        for (i, arg) in self.args.iter().enumerate() {
139            if let Ok(view) = arg.range_view() {
140                let d = view.dims();
141                match d {
142                    (0, 0) => {
143                        // Empty range: broadcast empties to total (possibly 0)
144                        iters.push(Box::new(std::iter::repeat_n(LiteralValue::Empty, total)));
145                    }
146                    (1, 1) => {
147                        // Single cell: materialize one value and broadcast
148                        let v = view.as_1x1().unwrap_or(LiteralValue::Empty);
149                        iters.push(Box::new(std::iter::repeat_n(v, total)));
150                    }
151                    (rows, cols) => {
152                        // For non-scalar ranges, pad to match max_dims if necessary
153                        let range_total = rows * cols;
154                        let mut values: Vec<LiteralValue> = Vec::with_capacity(range_total);
155                        view.for_each_cell(&mut |v| {
156                            values.push(v.clone());
157                            Ok(())
158                        })?;
159                        if range_total < total {
160                            // Need to pad this range with Empty values
161                            let padding =
162                                std::iter::repeat_n(LiteralValue::Empty, total - range_total);
163                            iters.push(Box::new(values.into_iter().chain(padding)));
164                        } else {
165                            // No padding needed
166                            iters.push(Box::new(values.into_iter()));
167                        }
168                    }
169                }
170            } else if let Ok(v) = arg.value() {
171                let vv = v.into_owned();
172                iters.push(Box::new(std::iter::repeat_n(vv, total)));
173            } else {
174                iters.push(Box::new(std::iter::repeat_n(
175                    LiteralValue::Error(ExcelError::new(ExcelErrorKind::Value)),
176                    total,
177                )));
178            }
179        }
180        // Create a vector to hold current window cells (one per arg)
181        let mut window_cells: Vec<LiteralValue> = vec![LiteralValue::Empty; iters.len()];
182        for _idx in 0..total {
183            // cancellation
184            if let Some(cancel) = self.fctx.cancellation_token() {
185                if cancel.load(std::sync::atomic::Ordering::Relaxed) {
186                    return Err(ExcelError::new(ExcelErrorKind::Cancelled));
187                }
188            }
189            for (i, it) in iters.iter_mut().enumerate() {
190                window_cells[i] = it.next().unwrap_or(LiteralValue::Empty);
191            }
192            f(&window_cells[..])?;
193        }
194        Ok(())
195    }
196
197    /// Multi-width window iteration. Produces, for each window position, a Vec per argument
198    /// containing the window's cells in window order (k=0..width-1) along the selected axis.
199    /// Padding behavior is controlled by `spec.padding`.
200    pub fn for_each_window_multi(
201        &mut self,
202        mut f: impl FnMut(&[Vec<LiteralValue>]) -> Result<(), ExcelError>,
203    ) -> Result<(), ExcelError> {
204        let spec = self.spec;
205        let width = spec.width.max(1);
206        // Determine maximum dims from any non-empty, non-1x1 range arg for Excel compatibility
207        let mut max_dims: Option<(usize, usize)> = None;
208        let mut saw_empty = false;
209        for arg in self.args.iter() {
210            if let Ok(view) = arg.range_view() {
211                let d = view.dims();
212                match d {
213                    (0, 0) => saw_empty = true,
214                    (1, 1) => (),
215                    other => {
216                        if let Some((max_r, max_c)) = max_dims {
217                            // Use maximum dimensions for compatibility
218                            if (max_c == 1 && other.1 == 1)
219                                || (max_r == 1 && other.0 == 1)
220                                || (max_c == other.1 && max_r == other.0)
221                            {
222                                max_dims = Some((max_r.max(other.0), max_c.max(other.1)));
223                            } else if max_c == other.1 {
224                                max_dims = Some((max_r.max(other.0), max_c));
225                            } else if max_r == other.0 {
226                                max_dims = Some((max_r, max_c.max(other.1)));
227                            } else {
228                                return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
229                                    format!(
230                                        "incompatible range dimensions: {},{} vs {},{}",
231                                        max_r, max_c, other.0, other.1
232                                    ),
233                                ));
234                            }
235                        } else {
236                            max_dims = Some(other);
237                        }
238                    }
239                }
240            }
241        }
242        let (rows, cols) = if let Some(d) = max_dims {
243            d
244        } else if saw_empty {
245            (0, 0)
246        } else {
247            (1, 1)
248        };
249
250        // Materialize/broadcast each argument into a flat row-major Vec for indexed access
251        let total = rows * cols;
252        let mut flats: Vec<Vec<LiteralValue>> = Vec::with_capacity(self.args.len());
253        for arg in self.args.iter() {
254            if let Ok(view) = arg.range_view() {
255                let d = view.dims();
256                match d {
257                    (0, 0) => {
258                        // Broadcast empties to total (may be 0)
259                        flats.push(std::iter::repeat_n(LiteralValue::Empty, total).collect());
260                    }
261                    (1, 1) => {
262                        let v = view.as_1x1().unwrap_or(LiteralValue::Empty);
263                        flats.push(std::iter::repeat_n(v, total).collect());
264                    }
265                    (r, c) => {
266                        // Collect values and pad if necessary
267                        let mut values: Vec<LiteralValue> = Vec::with_capacity(r * c);
268                        view.for_each_cell(&mut |v| {
269                            values.push(v.clone());
270                            Ok(())
271                        })?;
272                        let range_total = r * c;
273                        if range_total < total {
274                            // Pad with Empty values to match total
275                            values.extend(std::iter::repeat_n(
276                                LiteralValue::Empty,
277                                total - range_total,
278                            ));
279                        }
280                        flats.push(values);
281                    }
282                }
283            } else if let Ok(v) = arg.value() {
284                flats.push(std::iter::repeat_n(v.into_owned(), total).collect());
285            } else {
286                flats.push(
287                    std::iter::repeat_n(
288                        LiteralValue::Error(ExcelError::new(ExcelErrorKind::Value)),
289                        total,
290                    )
291                    .collect(),
292                );
293            }
294        }
295
296        // Helper to index with padding
297        let get_idx = |r: isize, c: isize| -> Option<usize> {
298            if r >= 0 && (r as usize) < rows && c >= 0 && (c as usize) < cols {
299                Some((r as usize) * cols + (c as usize))
300            } else {
301                match spec.padding {
302                    PaddingPolicy::None => None,
303                    PaddingPolicy::Empty => None, // signal as None; we'll push Empty
304                    PaddingPolicy::EdgeExtend => {
305                        let rr = r.clamp(0, rows as isize - 1) as usize;
306                        let cc = c.clamp(0, cols as isize - 1) as usize;
307                        Some(rr * cols + cc)
308                    }
309                }
310            }
311        };
312
313        match spec.axis {
314            WindowAxis::Rows => {
315                for c in 0..cols {
316                    let mut sr = 0usize;
317                    while sr < rows {
318                        // cancellation
319                        if let Some(cancel) = self.fctx.cancellation_token() {
320                            if cancel.load(std::sync::atomic::Ordering::Relaxed) {
321                                return Err(ExcelError::new(ExcelErrorKind::Cancelled));
322                            }
323                        }
324                        // Build window per argument
325                        let mut windows: Vec<Vec<LiteralValue>> = Vec::with_capacity(flats.len());
326                        let mut skip = false;
327                        for flat in flats.iter() {
328                            let mut win: Vec<LiteralValue> = Vec::with_capacity(width);
329                            for k in 0..width {
330                                let rr = sr as isize + k as isize;
331                                match get_idx(rr, c as isize) {
332                                    Some(idx) => win.push(flat[idx].clone()),
333                                    None => {
334                                        if spec.padding == PaddingPolicy::None {
335                                            skip = true;
336                                            break;
337                                        } else {
338                                            win.push(LiteralValue::Empty);
339                                        }
340                                    }
341                                }
342                            }
343                            if skip {
344                                break;
345                            }
346                            windows.push(win);
347                        }
348                        if !skip {
349                            f(&windows[..])?;
350                        }
351                        sr = sr.saturating_add(spec.step.max(1));
352                    }
353                }
354            }
355            WindowAxis::Cols => {
356                for r in 0..rows {
357                    let mut sc = 0usize;
358                    while sc < cols {
359                        if let Some(cancel) = self.fctx.cancellation_token() {
360                            if cancel.load(std::sync::atomic::Ordering::Relaxed) {
361                                return Err(ExcelError::new(ExcelErrorKind::Cancelled));
362                            }
363                        }
364                        let mut windows: Vec<Vec<LiteralValue>> = Vec::with_capacity(flats.len());
365                        let mut skip = false;
366                        for flat in flats.iter() {
367                            let mut win: Vec<LiteralValue> = Vec::with_capacity(width);
368                            for k in 0..width {
369                                let cc = sc as isize + k as isize;
370                                match get_idx(r as isize, cc) {
371                                    Some(idx) => win.push(flat[idx].clone()),
372                                    None => {
373                                        if spec.padding == PaddingPolicy::None {
374                                            skip = true;
375                                            break;
376                                        } else {
377                                            win.push(LiteralValue::Empty);
378                                        }
379                                    }
380                                }
381                            }
382                            if skip {
383                                break;
384                            }
385                            windows.push(win);
386                        }
387                        if !skip {
388                            f(&windows[..])?;
389                        }
390                        sc = sc.saturating_add(spec.step.max(1));
391                    }
392                }
393            }
394        }
395        Ok(())
396    }
397
398    /// Reduce over windows with optional parallel chunking. The reducer functions must be pure
399    /// over their inputs and combine must be associative to ensure deterministic results.
400    pub fn reduce_windows<T, FI, FF, FC>(
401        &mut self,
402        init: FI,
403        fold: FF,
404        combine: FC,
405    ) -> Result<T, ExcelError>
406    where
407        T: Send,
408        FI: Fn() -> T + Sync,
409        FF: Fn(&[Vec<LiteralValue>], &mut T) -> Result<(), ExcelError> + Sync,
410        FC: Fn(T, T) -> T + Sync,
411    {
412        // Prepare flattened, broadcasted argument data similar to for_each_window_multi
413        let spec = self.spec;
414        let mut max_dims: Option<(usize, usize)> = None;
415        let mut saw_empty = false;
416        for arg in self.args.iter() {
417            if let Ok(view) = arg.range_view() {
418                let d = view.dims();
419                match d {
420                    (0, 0) => saw_empty = true,
421                    (1, 1) => (),
422                    other => {
423                        if let Some((max_r, max_c)) = max_dims {
424                            if (max_c == 1 && other.1 == 1)
425                                || (max_r == 1 && other.0 == 1)
426                                || (max_c == other.1 && max_r == other.0)
427                            {
428                                max_dims = Some((max_r.max(other.0), max_c.max(other.1)));
429                            } else if max_c == other.1 {
430                                max_dims = Some((max_r.max(other.0), max_c));
431                            } else if max_r == other.0 {
432                                max_dims = Some((max_r, max_c.max(other.1)));
433                            } else {
434                                return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
435                                    format!(
436                                        "incompatible range dimensions: {},{} vs {},{}",
437                                        max_r, max_c, other.0, other.1
438                                    ),
439                                ));
440                            }
441                        } else {
442                            max_dims = Some(other);
443                        }
444                    }
445                }
446            }
447        }
448        let (rows, cols) = if let Some(d) = max_dims {
449            d
450        } else if saw_empty {
451            (0, 0)
452        } else {
453            (1, 1)
454        };
455
456        let total = rows * cols;
457        let mut flats: Vec<Vec<LiteralValue>> = Vec::with_capacity(self.args.len());
458        for arg in self.args.iter() {
459            if let Ok(view) = arg.range_view() {
460                let d = view.dims();
461                match d {
462                    (0, 0) => flats.push(std::iter::repeat_n(LiteralValue::Empty, total).collect()),
463                    (1, 1) => {
464                        let v = view.as_1x1().unwrap_or(LiteralValue::Empty);
465                        flats.push(std::iter::repeat_n(v, total).collect());
466                    }
467                    (r, c) => {
468                        let mut values: Vec<LiteralValue> = Vec::with_capacity(r * c);
469                        view.for_each_cell(&mut |v| {
470                            values.push(v.clone());
471                            Ok(())
472                        })?;
473                        let range_total = r * c;
474                        if range_total < total {
475                            values.extend(std::iter::repeat_n(
476                                LiteralValue::Empty,
477                                total - range_total,
478                            ));
479                        }
480                        flats.push(values);
481                    }
482                }
483            } else if let Ok(v) = arg.value() {
484                flats.push(std::iter::repeat_n(v.into_owned(), total).collect());
485            } else {
486                flats.push(
487                    std::iter::repeat_n(
488                        LiteralValue::Error(ExcelError::new(ExcelErrorKind::Value)),
489                        total,
490                    )
491                    .collect(),
492                );
493            }
494        }
495
496        // Helper for bounds/padding
497        let get_idx = |r: isize, c: isize| -> Option<usize> {
498            if r >= 0 && (r as usize) < rows && c >= 0 && (c as usize) < cols {
499                Some((r as usize) * cols + (c as usize))
500            } else {
501                match spec.padding {
502                    PaddingPolicy::None => None,
503                    PaddingPolicy::Empty => None,
504                    PaddingPolicy::EdgeExtend => {
505                        let rr = r.clamp(0, rows as isize - 1) as usize;
506                        let cc = c.clamp(0, cols as isize - 1) as usize;
507                        Some(rr * cols + cc)
508                    }
509                }
510            }
511        };
512
513        // Used-region clamp (data-driven): find leading/trailing all-empty rows/cols for large shapes
514        let mut row_start = 0usize;
515        let mut row_end = rows;
516        let mut col_start = 0usize;
517        let mut col_end = cols;
518        if rows > 1000 && cols > 0 {
519            'outer_top: for r in 0..rows {
520                for flat in &flats {
521                    let base = r * cols;
522                    if flat[base..base + cols]
523                        .iter()
524                        .any(|v| !matches!(v, LiteralValue::Empty))
525                    {
526                        row_start = r;
527                        break 'outer_top;
528                    }
529                }
530            }
531            'outer_bot: for r in (row_start..rows).rev() {
532                for flat in &flats {
533                    let base = r * cols;
534                    if flat[base..base + cols]
535                        .iter()
536                        .any(|v| !matches!(v, LiteralValue::Empty))
537                    {
538                        row_end = r + 1; // exclusive
539                        break 'outer_bot;
540                    }
541                }
542            }
543        }
544        if cols > 1000 && rows > 0 {
545            'outer_left: for c in 0..cols {
546                for flat in &flats {
547                    let mut any = false;
548                    for r in row_start..row_end {
549                        let idx = r * cols + c;
550                        if !matches!(flat[idx], LiteralValue::Empty) {
551                            any = true;
552                            break;
553                        }
554                    }
555                    if any {
556                        col_start = c;
557                        break 'outer_left;
558                    }
559                }
560            }
561            'outer_right: for c in (col_start..cols).rev() {
562                for flat in &flats {
563                    let mut any = false;
564                    for r in row_start..row_end {
565                        let idx = r * cols + c;
566                        if !matches!(flat[idx], LiteralValue::Empty) {
567                            any = true;
568                            break;
569                        }
570                    }
571                    if any {
572                        col_end = c + 1; // exclusive
573                        break 'outer_right;
574                    }
575                }
576            }
577        }
578        let eff_rows = row_end.saturating_sub(row_start);
579        let eff_cols = col_end.saturating_sub(col_start);
580        let eff_total = eff_rows * eff_cols;
581
582        // Heuristics for parallelism
583        // Use context chunk_hint to decide when to switch to parallel; default (256x256)/4 = 16,384
584        let hint = self.fctx.chunk_hint().unwrap_or(65_536);
585        let min_cells: usize = if cfg!(test) {
586            2_000
587        } else {
588            (hint / 4).max(8_192)
589        };
590        let can_parallel = self.fctx.thread_pool().is_some() && eff_total >= min_cells;
591
592        // Local function to process a range of the major axis
593        let flats_ref = &flats;
594        let spec_copy = spec;
595        let init_ref = &init;
596        let process_range = move |sr: usize, er: usize| -> Result<T, ExcelError> {
597            let mut acc = init_ref();
598            match spec_copy.axis {
599                WindowAxis::Rows => {
600                    let step = spec_copy.step.max(1);
601                    let width = spec_copy.width.max(1);
602
603                    // Fast-path for width==1: avoid per-row Vec allocations
604                    if width == 1 {
605                        for r in (row_start + sr..row_start + er).step_by(step) {
606                            for c in col_start..col_end {
607                                match get_idx(r as isize, c as isize) {
608                                    Some(idx) => {
609                                        // Create single-element windows without allocating new Vecs
610                                        let mut windows: Vec<Vec<LiteralValue>> =
611                                            Vec::with_capacity(flats_ref.len());
612                                        for flat in flats_ref.iter() {
613                                            windows.push(vec![flat[idx].clone()]);
614                                        }
615                                        fold(&windows[..], &mut acc)?;
616                                    }
617                                    None => {
618                                        if spec_copy.padding != PaddingPolicy::None {
619                                            let mut windows: Vec<Vec<LiteralValue>> =
620                                                Vec::with_capacity(flats_ref.len());
621                                            for _ in flats_ref.iter() {
622                                                windows.push(vec![LiteralValue::Empty]);
623                                            }
624                                            fold(&windows[..], &mut acc)?;
625                                        }
626                                    }
627                                }
628                            }
629                        }
630                    } else {
631                        // Original multi-width path
632                        for r in (row_start + sr..row_start + er).step_by(step) {
633                            for c in col_start..col_end {
634                                // Build window vectors per argument of length width along rows
635                                let mut windows: Vec<Vec<LiteralValue>> =
636                                    Vec::with_capacity(flats_ref.len());
637                                let mut skip = false;
638                                for flat in flats_ref.iter() {
639                                    let mut win: Vec<LiteralValue> = Vec::with_capacity(width);
640                                    for k in 0..width {
641                                        let rr = r as isize + k as isize;
642                                        match get_idx(rr, c as isize) {
643                                            Some(idx) => win.push(flat[idx].clone()),
644                                            None => {
645                                                if spec_copy.padding == PaddingPolicy::None {
646                                                    skip = true;
647                                                    break;
648                                                } else {
649                                                    win.push(LiteralValue::Empty);
650                                                }
651                                            }
652                                        }
653                                    }
654                                    if skip {
655                                        break;
656                                    }
657                                    windows.push(win);
658                                }
659                                if !skip {
660                                    fold(&windows[..], &mut acc)?;
661                                }
662                            }
663                        }
664                    }
665                }
666                WindowAxis::Cols => {
667                    let step = spec_copy.step.max(1);
668                    let width = spec_copy.width.max(1);
669
670                    // Fast-path for width==1: avoid per-row Vec allocations
671                    if width == 1 {
672                        for c in (col_start + sr..col_start + er).step_by(step) {
673                            for r in row_start..row_end {
674                                match get_idx(r as isize, c as isize) {
675                                    Some(idx) => {
676                                        // Create single-element windows without allocating new Vecs
677                                        let mut windows: Vec<Vec<LiteralValue>> =
678                                            Vec::with_capacity(flats_ref.len());
679                                        for flat in flats_ref.iter() {
680                                            windows.push(vec![flat[idx].clone()]);
681                                        }
682                                        fold(&windows[..], &mut acc)?;
683                                    }
684                                    None => {
685                                        if spec_copy.padding != PaddingPolicy::None {
686                                            let mut windows: Vec<Vec<LiteralValue>> =
687                                                Vec::with_capacity(flats_ref.len());
688                                            for _ in flats_ref.iter() {
689                                                windows.push(vec![LiteralValue::Empty]);
690                                            }
691                                            fold(&windows[..], &mut acc)?;
692                                        }
693                                    }
694                                }
695                            }
696                        }
697                    } else {
698                        // Original multi-width path
699                        for c in (col_start + sr..col_start + er).step_by(step) {
700                            for r in row_start..row_end {
701                                let mut windows: Vec<Vec<LiteralValue>> =
702                                    Vec::with_capacity(flats_ref.len());
703                                let mut skip = false;
704                                for flat in flats_ref.iter() {
705                                    let mut win: Vec<LiteralValue> = Vec::with_capacity(width);
706                                    for k in 0..width {
707                                        let cc = c as isize + k as isize;
708                                        match get_idx(r as isize, cc) {
709                                            Some(idx) => win.push(flat[idx].clone()),
710                                            None => {
711                                                if spec_copy.padding == PaddingPolicy::None {
712                                                    skip = true;
713                                                    break;
714                                                } else {
715                                                    win.push(LiteralValue::Empty);
716                                                }
717                                            }
718                                        }
719                                    }
720                                    if skip {
721                                        break;
722                                    }
723                                    windows.push(win);
724                                }
725                                if !skip {
726                                    fold(&windows[..], &mut acc)?;
727                                }
728                            }
729                        }
730                    }
731                }
732            }
733            Ok(acc)
734        };
735
736        if can_parallel {
737            let pool = self.fctx.thread_pool().unwrap().clone();
738            let threads = pool.current_num_threads().max(1);
739            let (major_len, partitions) = match spec.axis {
740                WindowAxis::Rows => (eff_rows.max(1), threads.min(eff_rows.max(1))),
741                WindowAxis::Cols => (eff_cols.max(1), threads.min(eff_cols.max(1))),
742            };
743            let chunk = major_len.div_ceil(partitions);
744            use rayon::prelude::*;
745            let result = pool.install(|| {
746                (0..partitions)
747                    .into_par_iter()
748                    .map(|i| {
749                        let start = i * chunk;
750                        let end = ((i + 1) * chunk).min(major_len);
751                        if start >= end {
752                            // empty acc
753                            return Ok(init_ref());
754                        }
755                        process_range(start, end)
756                    })
757                    .collect::<Result<Vec<T>, ExcelError>>()
758            })?;
759            // Deterministic combine in partition order
760            let mut acc = init_ref();
761            for part in result.into_iter() {
762                acc = combine(acc, part);
763            }
764            Ok(acc)
765        } else {
766            let major_len = match spec.axis {
767                WindowAxis::Rows => eff_rows,
768                WindowAxis::Cols => eff_cols,
769            };
770            process_range(0, major_len)
771        }
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use crate::test_workbook::TestWorkbook;
779    use crate::traits::ArgumentHandle;
780    use formualizer_common::LiteralValue;
781    use formualizer_parse::parser::{ASTNode, ASTNodeType};
782    use std::sync::Arc;
783    use std::sync::atomic::{AtomicUsize, Ordering};
784
785    fn interp(wb: &TestWorkbook) -> crate::interpreter::Interpreter<'_> {
786        wb.interpreter()
787    }
788    fn lit(v: LiteralValue) -> ASTNode {
789        ASTNode::new(ASTNodeType::Literal(v), None)
790    }
791
792    #[test]
793    fn reduce_rows_width3_step2_sum() {
794        let wb = TestWorkbook::new();
795        let ctx = interp(&wb);
796        // Column vector 1..=6 (6x1)
797        let col = lit(LiteralValue::Array(
798            (1..=6).map(|n| vec![LiteralValue::Int(n)]).collect(),
799        ));
800        let args = vec![ArgumentHandle::new(&col, &ctx)];
801        let fctx = ctx.function_context(None);
802        let mut wctx = SimpleWindowCtx::new(
803            &args,
804            &fctx,
805            WindowSpec {
806                width: 3,
807                step: 2,
808                axis: WindowAxis::Rows,
809                align_left: true,
810                padding: PaddingPolicy::None,
811            },
812        );
813        // Sum each window of 3 along rows with step 2; with padding None, last partial window is skipped
814        let total = wctx
815            .reduce_windows(
816                || 0i64,
817                |wins, acc| {
818                    let sum_win: i64 = wins[0]
819                        .iter()
820                        .map(|v| match v {
821                            LiteralValue::Int(i) => *i,
822                            LiteralValue::Number(n) => *n as i64,
823                            _ => 0,
824                        })
825                        .sum();
826                    *acc += sum_win;
827                    Ok(())
828                },
829                |a, b| a + b,
830            )
831            .unwrap();
832        // Windows at r=0 -> [1,2,3] sum 6; r=2 -> [3,4,5] sum 12; r=4 would be partial -> skipped => total 18
833        assert_eq!(total, 18);
834    }
835
836    #[test]
837    fn reduce_cols_width2_step3_sum() {
838        let wb = TestWorkbook::new();
839        let ctx = interp(&wb);
840        // Row vector [1..=7] (1x7)
841        let row = lit(LiteralValue::Array(vec![
842            (1..=7).map(LiteralValue::Int).collect(),
843        ]));
844        let args = vec![ArgumentHandle::new(&row, &ctx)];
845        let fctx = ctx.function_context(None);
846        let mut wctx = SimpleWindowCtx::new(
847            &args,
848            &fctx,
849            WindowSpec {
850                width: 2,
851                step: 3,
852                axis: WindowAxis::Cols,
853                align_left: true,
854                padding: PaddingPolicy::None,
855            },
856        );
857        let total = wctx
858            .reduce_windows(
859                || 0i64,
860                |wins, acc| {
861                    let sum_win: i64 = wins[0]
862                        .iter()
863                        .map(|v| match v {
864                            LiteralValue::Int(i) => *i,
865                            LiteralValue::Number(n) => *n as i64,
866                            _ => 0,
867                        })
868                        .sum();
869                    *acc += sum_win;
870                    Ok(())
871                },
872                |a, b| a + b,
873            )
874            .unwrap();
875        // c=0 -> [1,2] sum 3; c=3 -> [4,5] sum 9; c=6 partial -> skipped; total 12
876        assert_eq!(total, 12);
877    }
878
879    #[test]
880    fn used_region_clamp_rows() {
881        let wb = TestWorkbook::new();
882        let ctx = interp(&wb);
883        // 5000x1 column: 1000 empties, 3000 numbers 1..=3000, 1000 empties
884        let mut data: Vec<Vec<LiteralValue>> = Vec::with_capacity(5000);
885        for _ in 0..1000 {
886            data.push(vec![LiteralValue::Empty]);
887        }
888        for n in 1..=3000 {
889            data.push(vec![LiteralValue::Int(n)]);
890        }
891        for _ in 0..1000 {
892            data.push(vec![LiteralValue::Empty]);
893        }
894        let col = lit(LiteralValue::Array(data));
895        let args = vec![ArgumentHandle::new(&col, &ctx)];
896        let fctx = ctx.function_context(None);
897        let mut wctx = SimpleWindowCtx::new(
898            &args,
899            &fctx,
900            WindowSpec {
901                width: 1,
902                step: 1,
903                axis: WindowAxis::Rows,
904                align_left: true,
905                padding: PaddingPolicy::None,
906            },
907        );
908        let counter = Arc::new(AtomicUsize::new(0));
909        let ctr = counter.clone();
910        let sum = wctx
911            .reduce_windows(
912                || 0i64,
913                move |wins, acc| {
914                    ctr.fetch_add(1, Ordering::Relaxed);
915                    if let Some(v) = wins[0].last() {
916                        if let LiteralValue::Int(i) = v {
917                            *acc += *i;
918                        }
919                    }
920                    Ok(())
921                },
922                |a, b| a + b,
923            )
924            .unwrap();
925        // Clamp should trim to 3000 effective rows
926        assert_eq!(counter.load(Ordering::Relaxed), 3000);
927        // Sum 1..=3000
928        assert_eq!(sum, (3000i64 * 3001i64) / 2);
929    }
930
931    #[test]
932    fn used_region_clamp_cols() {
933        let wb = TestWorkbook::new();
934        let ctx = interp(&wb);
935        // 1x6000 row: 1000 empties, 4000 numbers 1..=4000, 1000 empties
936        let mut row: Vec<LiteralValue> = Vec::with_capacity(6000);
937        row.extend(std::iter::repeat_n(LiteralValue::Empty, 1000));
938        row.extend((1..=4000).map(LiteralValue::Int));
939        row.extend(std::iter::repeat_n(LiteralValue::Empty, 1000));
940        let arr = lit(LiteralValue::Array(vec![row]));
941        let args = vec![ArgumentHandle::new(&arr, &ctx)];
942        let fctx = ctx.function_context(None);
943        let mut wctx = SimpleWindowCtx::new(
944            &args,
945            &fctx,
946            WindowSpec {
947                width: 1,
948                step: 1,
949                axis: WindowAxis::Cols,
950                align_left: true,
951                padding: PaddingPolicy::None,
952            },
953        );
954        let counter = Arc::new(AtomicUsize::new(0));
955        let ctr = counter.clone();
956        let sum = wctx
957            .reduce_windows(
958                || 0i64,
959                move |wins, acc| {
960                    ctr.fetch_add(1, Ordering::Relaxed);
961                    if let Some(v) = wins[0].last() {
962                        if let LiteralValue::Int(i) = v {
963                            *acc += *i;
964                        }
965                    }
966                    Ok(())
967                },
968                |a, b| a + b,
969            )
970            .unwrap();
971        // Clamp should trim to 4000 effective cols
972        assert_eq!(counter.load(Ordering::Relaxed), 4000);
973        // Sum 1..=4000
974        assert_eq!(sum, (4000i64 * 4001i64) / 2);
975    }
976}