Skip to main content

formualizer_eval/engine/
range_view.rs

1use crate::arrow_store;
2use crate::arrow_store::IngestBuilder;
3use crate::stripes::NumericChunk;
4use arrow_array::Array;
5use arrow_schema::DataType;
6use formualizer_common::{CoercionPolicy, DateSystem, ExcelError, LiteralValue};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10#[derive(Clone)]
11pub enum RangeBacking<'a> {
12    Borrowed(&'a arrow_store::ArrowSheet),
13    Owned(Arc<arrow_store::ArrowSheet>),
14}
15
16/// Unified view over a 2D range with efficient traversal utilities.
17/// Phase 4: Arrow-only backing.
18#[derive(Clone)]
19pub struct RangeView<'a> {
20    backing: RangeBacking<'a>,
21    sr: usize,
22    sc: usize,
23    er: usize,
24    ec: usize,
25    rows: usize,
26    cols: usize,
27    cancel_token: Option<Arc<AtomicBool>>,
28}
29
30impl<'a> core::fmt::Debug for RangeView<'a> {
31    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
32        f.debug_struct("RangeView")
33            .field("rows", &self.rows)
34            .field("cols", &self.cols)
35            .field("kind", &self.kind_probe())
36            .finish()
37    }
38}
39
40#[derive(Copy, Clone, Debug, Eq, PartialEq)]
41pub enum RangeKind {
42    Empty,
43    NumericOnly,
44    TextOnly,
45    Mixed,
46}
47
48pub struct ChunkCol {
49    pub numbers: Option<arrow_array::ArrayRef>,
50    pub booleans: Option<arrow_array::ArrayRef>,
51    pub text: Option<arrow_array::ArrayRef>,
52    pub errors: Option<arrow_array::ArrayRef>,
53    pub type_tag: arrow_array::ArrayRef,
54}
55
56pub struct ChunkSlice {
57    pub row_start: usize, // relative to view top
58    pub row_len: usize,
59    pub cols: Vec<ChunkCol>,
60}
61
62pub struct RowChunkIterator<'a> {
63    view: &'a RangeView<'a>,
64    current_chunk_idx: usize,
65}
66
67impl<'a> Iterator for RowChunkIterator<'a> {
68    type Item = Result<ChunkSlice, ExcelError>;
69
70    fn next(&mut self) -> Option<Self::Item> {
71        if let Some(token) = &self.view.cancel_token
72            && token.load(Ordering::Relaxed)
73        {
74            return Some(Err(ExcelError::new(
75                formualizer_common::ExcelErrorKind::Cancelled,
76            )));
77        }
78
79        let sheet = self.view.sheet();
80        let chunk_starts = &sheet.chunk_starts;
81        let sheet_rows = sheet.nrows as usize;
82        let row_end = self.view.er.min(sheet_rows.saturating_sub(1));
83
84        while self.current_chunk_idx < chunk_starts.len() {
85            let ci = self.current_chunk_idx;
86            let start = chunk_starts[ci];
87            self.current_chunk_idx += 1;
88
89            let end = if ci + 1 < chunk_starts.len() {
90                chunk_starts[ci + 1]
91            } else {
92                sheet_rows
93            };
94            let len = end.saturating_sub(start);
95            if len == 0 {
96                continue;
97            }
98            let chunk_end_abs = start + len - 1;
99            let is = start.max(self.view.sr);
100            let ie = chunk_end_abs.min(row_end);
101            if is > ie {
102                continue;
103            }
104            let seg_len = ie - is + 1;
105            let rel_off = is - start;
106
107            let mut cols = Vec::with_capacity(self.view.cols);
108            for col_idx in self.view.sc..=self.view.ec {
109                if col_idx >= sheet.columns.len() {
110                    let numbers = Some(arrow_array::new_null_array(&DataType::Float64, seg_len));
111                    let booleans = Some(arrow_array::new_null_array(&DataType::Boolean, seg_len));
112                    let text = Some(arrow_array::new_null_array(&DataType::Utf8, seg_len));
113                    let errors = Some(arrow_array::new_null_array(&DataType::UInt8, seg_len));
114                    let type_tag: arrow_array::ArrayRef =
115                        Arc::new(arrow_array::UInt8Array::from(vec![
116                            arrow_store::TypeTag::Empty
117                                as u8;
118                            seg_len
119                        ]));
120                    cols.push(ChunkCol {
121                        numbers,
122                        booleans,
123                        text,
124                        errors,
125                        type_tag,
126                    });
127                } else {
128                    let col = &sheet.columns[col_idx];
129                    let Some(ch) = col.chunk(ci) else {
130                        let numbers =
131                            Some(arrow_array::new_null_array(&DataType::Float64, seg_len));
132                        let booleans =
133                            Some(arrow_array::new_null_array(&DataType::Boolean, seg_len));
134                        let text = Some(arrow_array::new_null_array(&DataType::Utf8, seg_len));
135                        let errors = Some(arrow_array::new_null_array(&DataType::UInt8, seg_len));
136                        let type_tag: arrow_array::ArrayRef =
137                            Arc::new(arrow_array::UInt8Array::from(vec![
138                                arrow_store::TypeTag::Empty
139                                    as u8;
140                                seg_len
141                            ]));
142                        cols.push(ChunkCol {
143                            numbers,
144                            booleans,
145                            text,
146                            errors,
147                            type_tag,
148                        });
149                        continue;
150                    };
151
152                    let numbers_base: arrow_array::ArrayRef = ch.numbers_or_null();
153                    let booleans_base: arrow_array::ArrayRef = ch.booleans_or_null();
154                    let text_base: arrow_array::ArrayRef = ch.text_or_null();
155                    let errors_base: arrow_array::ArrayRef = ch.errors_or_null();
156
157                    let numbers = Some(numbers_base.slice(rel_off, seg_len));
158                    let booleans = Some(booleans_base.slice(rel_off, seg_len));
159                    let text = Some(text_base.slice(rel_off, seg_len));
160                    let errors = Some(errors_base.slice(rel_off, seg_len));
161                    let type_tag: arrow_array::ArrayRef =
162                        Arc::new(ch.type_tag.slice(rel_off, seg_len));
163                    cols.push(ChunkCol {
164                        numbers,
165                        booleans,
166                        text,
167                        errors,
168                        type_tag,
169                    });
170                }
171            }
172            return Some(Ok(ChunkSlice {
173                row_start: is - self.view.sr,
174                row_len: seg_len,
175                cols,
176            }));
177        }
178        None
179    }
180}
181
182impl<'a> RangeView<'a> {
183    pub(crate) fn new(
184        backing: RangeBacking<'a>,
185        sr: usize,
186        sc: usize,
187        er: usize,
188        ec: usize,
189        rows: usize,
190        cols: usize,
191    ) -> Self {
192        Self {
193            backing,
194            sr,
195            sc,
196            er,
197            ec,
198            rows,
199            cols,
200            cancel_token: None,
201        }
202    }
203
204    #[must_use]
205    pub fn with_cancel_token(mut self, token: Option<Arc<AtomicBool>>) -> Self {
206        self.cancel_token = token;
207        self
208    }
209
210    #[inline]
211    pub fn sheet(&self) -> &arrow_store::ArrowSheet {
212        match &self.backing {
213            RangeBacking::Borrowed(s) => s,
214            RangeBacking::Owned(s) => s,
215        }
216    }
217
218    pub fn from_owned_rows(
219        rows: Vec<Vec<LiteralValue>>,
220        date_system: DateSystem,
221    ) -> RangeView<'static> {
222        let nrows = rows.len();
223        let ncols = rows.iter().map(|r| r.len()).max().unwrap_or(0);
224
225        let chunk_rows = 32 * 1024;
226        let mut ib = IngestBuilder::new("__tmp", ncols, chunk_rows, date_system);
227
228        for mut r in rows {
229            r.resize(ncols, LiteralValue::Empty);
230            ib.append_row(&r).expect("append_row for RangeView");
231        }
232
233        let sheet = Arc::new(ib.finish());
234
235        if nrows == 0 || ncols == 0 {
236            return RangeView {
237                backing: RangeBacking::Owned(sheet),
238                sr: 1,
239                sc: 1,
240                er: 0,
241                ec: 0,
242                rows: 0,
243                cols: 0,
244                cancel_token: None,
245            };
246        }
247
248        RangeView {
249            backing: RangeBacking::Owned(sheet),
250            sr: 0,
251            sc: 0,
252            er: nrows - 1,
253            ec: ncols - 1,
254            rows: nrows,
255            cols: ncols,
256            cancel_token: None,
257        }
258    }
259
260    pub fn dims(&self) -> (usize, usize) {
261        (self.rows, self.cols)
262    }
263
264    pub fn expand_to(&self, rows: usize, cols: usize) -> RangeView<'a> {
265        let er = self.sr + rows.saturating_sub(1);
266        let ec = self.sc + cols.saturating_sub(1);
267        RangeView {
268            backing: match &self.backing {
269                RangeBacking::Borrowed(s) => RangeBacking::Borrowed(s),
270                RangeBacking::Owned(s) => RangeBacking::Owned(s.clone()),
271            },
272            sr: self.sr,
273            sc: self.sc,
274            er,
275            ec,
276            rows,
277            cols,
278            cancel_token: self.cancel_token.clone(),
279        }
280    }
281
282    pub fn sub_view(&self, rs: usize, cs: usize, rows: usize, cols: usize) -> RangeView<'a> {
283        let abs_sr = self.sr + rs;
284        let abs_sc = self.sc + cs;
285        let er = abs_sr + rows.saturating_sub(1);
286        let ec = abs_sc + cols.saturating_sub(1);
287        RangeView {
288            backing: match &self.backing {
289                RangeBacking::Borrowed(s) => RangeBacking::Borrowed(s),
290                RangeBacking::Owned(s) => RangeBacking::Owned(s.clone()),
291            },
292            sr: abs_sr,
293            sc: abs_sc,
294            er,
295            ec,
296            rows,
297            cols,
298            cancel_token: self.cancel_token.clone(),
299        }
300    }
301
302    #[inline]
303    pub fn is_empty(&self) -> bool {
304        self.rows == 0 || self.cols == 0
305    }
306
307    /// Absolute 0-based start row of this view.
308    pub fn start_row(&self) -> usize {
309        self.sr
310    }
311    /// Absolute 0-based end row of this view (inclusive).
312    pub fn end_row(&self) -> usize {
313        self.er
314    }
315    /// Absolute 0-based start column of this view.
316    pub fn start_col(&self) -> usize {
317        self.sc
318    }
319    /// Absolute 0-based end column of this view (inclusive).
320    pub fn end_col(&self) -> usize {
321        self.ec
322    }
323    /// Owning sheet name.
324    pub fn sheet_name(&self) -> &str {
325        &self.sheet().name
326    }
327
328    pub fn kind_probe(&self) -> RangeKind {
329        if self.is_empty() {
330            return RangeKind::Empty;
331        }
332
333        let mut has_num = false;
334        let mut has_text = false;
335
336        for r in 0..self.rows {
337            for c in 0..self.cols {
338                match self.get_cell(r, c) {
339                    LiteralValue::Empty => {}
340                    LiteralValue::Number(_) | LiteralValue::Int(_) => has_num = true,
341                    LiteralValue::Text(_) => has_text = true,
342                    _ => return RangeKind::Mixed,
343                }
344                if has_num && has_text {
345                    return RangeKind::Mixed;
346                }
347            }
348        }
349
350        match (has_num, has_text) {
351            (false, false) => RangeKind::Empty,
352            (true, false) => RangeKind::NumericOnly,
353            (false, true) => RangeKind::TextOnly,
354            (true, true) => RangeKind::Mixed,
355        }
356    }
357
358    pub fn as_1x1(&self) -> Option<LiteralValue> {
359        if self.rows == 1 && self.cols == 1 {
360            Some(self.get_cell(0, 0))
361        } else {
362            None
363        }
364    }
365
366    /// Get a specific cell by row and column index (0-based).
367    /// Returns Empty for out-of-bounds access.
368    pub fn get_cell(&self, row: usize, col: usize) -> LiteralValue {
369        if row >= self.rows || col >= self.cols {
370            return LiteralValue::Empty;
371        }
372        let abs_row = self.sr + row;
373        let abs_col = self.sc + col;
374        let sheet = self.sheet();
375        let sheet_rows = sheet.nrows as usize;
376        if abs_row >= sheet_rows {
377            return LiteralValue::Empty;
378        }
379        if abs_col >= sheet.columns.len() {
380            return LiteralValue::Empty;
381        }
382        let col_ref = &sheet.columns[abs_col];
383        // Locate chunk by binary searching start offsets
384        let chunk_starts = &sheet.chunk_starts;
385        let ch_idx = match chunk_starts.binary_search(&abs_row) {
386            Ok(i) => i,
387            Err(0) => 0,
388            Err(i) => i - 1,
389        };
390        let Some(ch) = col_ref.chunk(ch_idx) else {
391            return LiteralValue::Empty;
392        };
393        let row_start = chunk_starts[ch_idx];
394        let in_off = abs_row - row_start;
395        // Overlay takes precedence: user edits over computed over base.
396        if let Some(ov) = ch
397            .overlay
398            .get(in_off)
399            .or_else(|| ch.computed_overlay.get(in_off))
400        {
401            return match ov {
402                arrow_store::OverlayValue::Empty => LiteralValue::Empty,
403                arrow_store::OverlayValue::Number(n) => LiteralValue::Number(*n),
404                arrow_store::OverlayValue::DateTime(serial) => {
405                    LiteralValue::from_serial_number(*serial)
406                }
407                arrow_store::OverlayValue::Duration(serial) => {
408                    let nanos_f = *serial * 86_400.0 * 1_000_000_000.0;
409                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
410                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
411                }
412                arrow_store::OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
413                arrow_store::OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
414                arrow_store::OverlayValue::Error(code) => {
415                    let kind = arrow_store::unmap_error_code(*code);
416                    LiteralValue::Error(ExcelError::new(kind))
417                }
418                arrow_store::OverlayValue::Pending => LiteralValue::Pending,
419            };
420        }
421        // Read tag and route to lane
422        let tag_u8 = ch.type_tag.value(in_off);
423        match arrow_store::TypeTag::from_u8(tag_u8) {
424            arrow_store::TypeTag::Empty => LiteralValue::Empty,
425            arrow_store::TypeTag::Number => {
426                if let Some(arr) = &ch.numbers {
427                    if arr.is_null(in_off) {
428                        return LiteralValue::Empty;
429                    }
430                    LiteralValue::Number(arr.value(in_off))
431                } else {
432                    LiteralValue::Empty
433                }
434            }
435            arrow_store::TypeTag::DateTime => {
436                if let Some(arr) = &ch.numbers {
437                    if arr.is_null(in_off) {
438                        return LiteralValue::Empty;
439                    }
440                    LiteralValue::from_serial_number(arr.value(in_off))
441                } else {
442                    LiteralValue::Empty
443                }
444            }
445            arrow_store::TypeTag::Duration => {
446                if let Some(arr) = &ch.numbers {
447                    if arr.is_null(in_off) {
448                        return LiteralValue::Empty;
449                    }
450                    let serial = arr.value(in_off);
451                    let nanos_f = serial * 86_400.0 * 1_000_000_000.0;
452                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
453                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
454                } else {
455                    LiteralValue::Empty
456                }
457            }
458            arrow_store::TypeTag::Boolean => {
459                if let Some(arr) = &ch.booleans {
460                    if arr.is_null(in_off) {
461                        return LiteralValue::Empty;
462                    }
463                    LiteralValue::Boolean(arr.value(in_off))
464                } else {
465                    LiteralValue::Empty
466                }
467            }
468            arrow_store::TypeTag::Text => {
469                if let Some(arr) = &ch.text {
470                    if arr.is_null(in_off) {
471                        return LiteralValue::Empty;
472                    }
473                    let sa = arr
474                        .as_any()
475                        .downcast_ref::<arrow_array::StringArray>()
476                        .unwrap();
477                    LiteralValue::Text(sa.value(in_off).to_string())
478                } else {
479                    LiteralValue::Empty
480                }
481            }
482            arrow_store::TypeTag::Error => {
483                if let Some(arr) = &ch.errors {
484                    if arr.is_null(in_off) {
485                        return LiteralValue::Empty;
486                    }
487                    let kind = arrow_store::unmap_error_code(arr.value(in_off));
488                    LiteralValue::Error(ExcelError::new(kind))
489                } else {
490                    LiteralValue::Empty
491                }
492            }
493            arrow_store::TypeTag::Pending => LiteralValue::Pending,
494        }
495    }
496
497    /// Iterate overlapping chunks by row segment.
498    pub fn iter_row_chunks(&self) -> RowChunkIterator<'_> {
499        RowChunkIterator {
500            view: self,
501            current_chunk_idx: 0,
502        }
503    }
504
505    /// Row-major cell traversal.
506    pub fn for_each_cell(
507        &self,
508        f: &mut dyn FnMut(&LiteralValue) -> Result<(), ExcelError>,
509    ) -> Result<(), ExcelError> {
510        for res in self.iter_row_chunks() {
511            let cs = res?;
512            for r in 0..cs.row_len {
513                for c in 0..self.cols {
514                    let tmp = self.get_cell(cs.row_start + r, c);
515                    f(&tmp)?;
516                }
517            }
518        }
519        Ok(())
520    }
521
522    /// Visit each row as a borrowed slice (buffered).
523    pub fn for_each_row(
524        &self,
525        f: &mut dyn FnMut(&[LiteralValue]) -> Result<(), ExcelError>,
526    ) -> Result<(), ExcelError> {
527        let mut buf: Vec<LiteralValue> = Vec::with_capacity(self.cols);
528        for r in 0..self.rows {
529            buf.clear();
530            for c in 0..self.cols {
531                buf.push(self.get_cell(r, c));
532            }
533            f(&buf[..])?;
534        }
535        Ok(())
536    }
537
538    /// Visit each column as a contiguous slice (buffered).
539    pub fn for_each_col(
540        &self,
541        f: &mut dyn FnMut(&[LiteralValue]) -> Result<(), ExcelError>,
542    ) -> Result<(), ExcelError> {
543        let mut col_buf: Vec<LiteralValue> = Vec::with_capacity(self.rows);
544        for c in 0..self.cols {
545            col_buf.clear();
546            for r in 0..self.rows {
547                col_buf.push(self.get_cell(r, c));
548            }
549            f(&col_buf[..])?;
550        }
551        Ok(())
552    }
553
554    /// Get a numeric value at a specific cell, with coercion.
555    /// Returns None for empty cells or non-coercible values.
556    pub fn get_cell_numeric(&self, row: usize, col: usize, policy: CoercionPolicy) -> Option<f64> {
557        if row >= self.rows || col >= self.cols {
558            return None;
559        }
560
561        let val = self.get_cell(row, col);
562        pack_numeric(&val, policy).ok().flatten()
563    }
564
565    /// Numeric chunk iteration with coercion policy.
566    pub fn numbers_chunked(
567        &self,
568        policy: CoercionPolicy,
569        min_chunk: usize,
570        f: &mut dyn FnMut(NumericChunk) -> Result<(), ExcelError>,
571    ) -> Result<(), ExcelError> {
572        // Fast path for Arrow numbers lane when policy allows ignoring non-numeric cells in ranges (standard Excel behavior for SUM/AVERAGE/etc over ranges)
573        if matches!(policy, CoercionPolicy::NumberStrict) {
574            for res in self.numbers_slices() {
575                let (_, _, cols) = res?;
576                for col in cols {
577                    if col.null_count() < col.len() {
578                        let data = col.values();
579                        // If there are nulls, we need to handle them.
580                        // Currently NumericChunk doesn't have a perfect way to represent sparse Arrow slices
581                        // without copying if we want a contiguous f64 slice.
582                        // For now, we can just provide the raw data and the validity mask if it exists.
583
584                        let validity = if col.null_count() > 0 {
585                            // Extract validity mask.
586                            // Note: This is still slightly awkward with the current NumericChunk design.
587                            None // TODO: Implement validity mask propagation
588                        } else {
589                            None
590                        };
591
592                        if col.null_count() == 0 {
593                            f(NumericChunk { data, validity })?;
594                        } else {
595                            // Fallback for nulls: iterate and push to a small buffer
596                            let mut buf = Vec::with_capacity(col.len());
597                            for i in 0..col.len() {
598                                if !col.is_null(i) {
599                                    buf.push(col.value(i));
600                                }
601                            }
602                            if !buf.is_empty() {
603                                f(NumericChunk {
604                                    data: &buf,
605                                    validity: None,
606                                })?;
607                            }
608                        }
609                    }
610                }
611            }
612            return Ok(());
613        }
614
615        let min_chunk = min_chunk.max(1);
616        let mut buf: Vec<f64> = Vec::with_capacity(min_chunk);
617        let mut flush = |buf: &mut Vec<f64>| -> Result<(), ExcelError> {
618            if buf.is_empty() {
619                return Ok(());
620            }
621            // SAFETY: read-only borrow for callback duration
622            let ptr = buf.as_ptr();
623            let len = buf.len();
624            let slice = unsafe { std::slice::from_raw_parts(ptr, len) };
625            let chunk = NumericChunk {
626                data: slice,
627                validity: None,
628            };
629            f(chunk)?;
630            buf.clear();
631            Ok(())
632        };
633
634        self.for_each_cell(&mut |v| {
635            if let Some(n) = pack_numeric(v, policy)? {
636                buf.push(n);
637                if buf.len() >= min_chunk {
638                    flush(&mut buf)?;
639                }
640            }
641            Ok(())
642        })?;
643        flush(&mut buf)?;
644
645        Ok(())
646    }
647
648    /// Typed numeric slices per row-segment: (row_start, row_len, per-column Float64 arrays)
649    pub fn numbers_slices(
650        &self,
651    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::Float64Array>>), ExcelError>> + '_
652    {
653        use crate::compute_prelude::zip_select;
654        use arrow_array::builder::{BooleanBuilder, Float64Builder};
655
656        self.iter_row_chunks().map(move |res| {
657            let cs = res?;
658            let mut out_cols: Vec<Arc<arrow_array::Float64Array>> =
659                Vec::with_capacity(cs.cols.len());
660            let sheet = self.sheet();
661            let chunk_starts = &sheet.chunk_starts;
662
663            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
664                let base = cs.cols[local_c]
665                    .numbers
666                    .as_ref()
667                    .expect("numbers lane exists")
668                    .clone();
669                let base_fa = base
670                    .as_any()
671                    .downcast_ref::<arrow_array::Float64Array>()
672                    .unwrap()
673                    .clone();
674                let base_arc = Arc::new(base_fa);
675
676                // Identify chunk and overlay segment
677                let abs_seg_start = self.sr + cs.row_start;
678                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
679                    Ok(i) => i,
680                    Err(0) => 0,
681                    Err(i) => i - 1,
682                };
683                if col_idx >= sheet.columns.len() {
684                    out_cols.push(base_arc);
685                    continue;
686                }
687                let col = &sheet.columns[col_idx];
688                let Some(ch) = col.chunk(ch_idx) else {
689                    out_cols.push(base_arc);
690                    continue;
691                };
692                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
693                let seg_range = rel_off..(rel_off + cs.row_len);
694                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
695                    || (!ch.computed_overlay.is_empty()
696                        && ch.computed_overlay.any_in_range(seg_range.clone()));
697                if has_overlay {
698                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
699                    let mut ob = Float64Builder::with_capacity(cs.row_len);
700                    for i in 0..cs.row_len {
701                        if let Some(ov) = ch
702                            .overlay
703                            .get(rel_off + i)
704                            .or_else(|| ch.computed_overlay.get(rel_off + i))
705                        {
706                            mask_b.append_value(true);
707                            match ov {
708                                arrow_store::OverlayValue::Number(n)
709                                | arrow_store::OverlayValue::DateTime(n)
710                                | arrow_store::OverlayValue::Duration(n) => ob.append_value(*n),
711                                _ => ob.append_null(),
712                            }
713                        } else {
714                            mask_b.append_value(false);
715                            ob.append_null();
716                        }
717                    }
718                    let mask = mask_b.finish();
719                    let overlay_vals = ob.finish();
720                    let base_fa = base
721                        .as_any()
722                        .downcast_ref::<arrow_array::Float64Array>()
723                        .unwrap();
724                    let zipped = zip_select(&mask, &overlay_vals, base_fa).expect("zip overlay");
725                    let fa = zipped
726                        .as_any()
727                        .downcast_ref::<arrow_array::Float64Array>()
728                        .unwrap()
729                        .clone();
730                    out_cols.push(Arc::new(fa));
731                } else {
732                    out_cols.push(base_arc);
733                }
734            }
735            Ok((cs.row_start, cs.row_len, out_cols))
736        })
737    }
738
739    /// Typed boolean slices per row-segment, overlay-aware via zip.
740    pub fn booleans_slices(
741        &self,
742    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::BooleanArray>>), ExcelError>> + '_
743    {
744        use crate::compute_prelude::zip_select;
745        use arrow_array::builder::BooleanBuilder;
746
747        self.iter_row_chunks().map(move |res| {
748            let cs = res?;
749            let mut out_cols: Vec<Arc<arrow_array::BooleanArray>> =
750                Vec::with_capacity(cs.cols.len());
751            let sheet = self.sheet();
752            let chunk_starts = &sheet.chunk_starts;
753
754            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
755                let base = cs.cols[local_c]
756                    .booleans
757                    .as_ref()
758                    .expect("booleans lane exists")
759                    .clone();
760                let base_ba = base
761                    .as_any()
762                    .downcast_ref::<arrow_array::BooleanArray>()
763                    .unwrap()
764                    .clone();
765                let base_arc = Arc::new(base_ba);
766
767                // Identify chunk and overlay segment
768                let abs_seg_start = self.sr + cs.row_start;
769                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
770                    Ok(i) => i,
771                    Err(0) => 0,
772                    Err(i) => i - 1,
773                };
774                if col_idx >= sheet.columns.len() {
775                    out_cols.push(base_arc);
776                    continue;
777                }
778                let col = &sheet.columns[col_idx];
779                let Some(ch) = col.chunk(ch_idx) else {
780                    out_cols.push(base_arc);
781                    continue;
782                };
783                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
784                let seg_range = rel_off..(rel_off + cs.row_len);
785                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
786                    || (!ch.computed_overlay.is_empty()
787                        && ch.computed_overlay.any_in_range(seg_range.clone()));
788                if has_overlay {
789                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
790                    let mut bb = BooleanBuilder::with_capacity(cs.row_len);
791                    for i in 0..cs.row_len {
792                        if let Some(ov) = ch
793                            .overlay
794                            .get(rel_off + i)
795                            .or_else(|| ch.computed_overlay.get(rel_off + i))
796                        {
797                            mask_b.append_value(true);
798                            match ov {
799                                arrow_store::OverlayValue::Boolean(b) => bb.append_value(*b),
800                                _ => bb.append_null(),
801                            }
802                        } else {
803                            mask_b.append_value(false);
804                            bb.append_null();
805                        }
806                    }
807                    let mask = mask_b.finish();
808                    let overlay_vals = bb.finish();
809                    let base_ba = base
810                        .as_any()
811                        .downcast_ref::<arrow_array::BooleanArray>()
812                        .unwrap();
813                    let zipped =
814                        zip_select(&mask, &overlay_vals, base_ba).expect("zip boolean overlay");
815                    let ba = zipped
816                        .as_any()
817                        .downcast_ref::<arrow_array::BooleanArray>()
818                        .unwrap()
819                        .clone();
820                    out_cols.push(Arc::new(ba));
821                } else {
822                    out_cols.push(base_arc);
823                }
824            }
825            Ok((cs.row_start, cs.row_len, out_cols))
826        })
827    }
828
829    /// Text slices per row-segment (erased as ArrayRef for Utf8 today; future Dict/View support).
830    pub fn text_slices(
831        &self,
832    ) -> impl Iterator<Item = Result<(usize, usize, Vec<arrow_array::ArrayRef>), ExcelError>> + '_
833    {
834        use crate::compute_prelude::zip_select;
835        use arrow_array::builder::{BooleanBuilder, StringBuilder};
836
837        self.iter_row_chunks().map(move |res| {
838            let cs = res?;
839            let mut out_cols: Vec<arrow_array::ArrayRef> = Vec::with_capacity(cs.cols.len());
840            let sheet = self.sheet();
841            let chunk_starts = &sheet.chunk_starts;
842
843            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
844                let base = cs.cols[local_c]
845                    .text
846                    .as_ref()
847                    .expect("text lane exists")
848                    .clone();
849                let abs_seg_start = self.sr + cs.row_start;
850                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
851                    Ok(i) => i,
852                    Err(0) => 0,
853                    Err(i) => i - 1,
854                };
855                if col_idx >= sheet.columns.len() {
856                    out_cols.push(base.clone());
857                    continue;
858                }
859                let col = &sheet.columns[col_idx];
860                let Some(ch) = col.chunk(ch_idx) else {
861                    out_cols.push(base.clone());
862                    continue;
863                };
864                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
865                let seg_range = rel_off..(rel_off + cs.row_len);
866                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
867                    || (!ch.computed_overlay.is_empty()
868                        && ch.computed_overlay.any_in_range(seg_range.clone()));
869                if has_overlay {
870                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
871                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
872                    for i in 0..cs.row_len {
873                        if let Some(ov) = ch
874                            .overlay
875                            .get(rel_off + i)
876                            .or_else(|| ch.computed_overlay.get(rel_off + i))
877                        {
878                            mask_b.append_value(true);
879                            match ov {
880                                arrow_store::OverlayValue::Text(s) => sb.append_value(s),
881                                _ => sb.append_null(),
882                            }
883                        } else {
884                            mask_b.append_value(false);
885                            sb.append_null();
886                        }
887                    }
888                    let mask = mask_b.finish();
889                    let overlay_vals = sb.finish();
890                    let base_sa = base
891                        .as_any()
892                        .downcast_ref::<arrow_array::StringArray>()
893                        .unwrap();
894                    let zipped =
895                        zip_select(&mask, &overlay_vals, base_sa).expect("zip text overlay");
896                    out_cols.push(zipped);
897                } else {
898                    out_cols.push(base.clone());
899                }
900            }
901            Ok((cs.row_start, cs.row_len, out_cols))
902        })
903    }
904
905    /// Typed lowered text slices per row-segment, overlay-aware via zip.
906    pub fn lowered_text_slices(
907        &self,
908    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::StringArray>>), ExcelError>> + '_
909    {
910        use crate::compute_prelude::zip_select;
911        use arrow_array::builder::{BooleanBuilder, StringBuilder};
912
913        self.iter_row_chunks().map(move |res| {
914            let cs = res?;
915            let mut out_cols: Vec<Arc<arrow_array::StringArray>> =
916                Vec::with_capacity(cs.cols.len());
917            let sheet = self.sheet();
918            let chunk_starts = &sheet.chunk_starts;
919
920            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
921                // Identify chunk
922                let abs_seg_start = self.sr + cs.row_start;
923                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
924                    Ok(i) => i,
925                    Err(0) => 0,
926                    Err(i) => i - 1,
927                };
928                if col_idx >= sheet.columns.len() {
929                    out_cols.push(Arc::new(arrow_array::StringArray::new_null(cs.row_len)));
930                    continue;
931                }
932                let col = &sheet.columns[col_idx];
933                let Some(ch) = col.chunk(ch_idx) else {
934                    out_cols.push(Arc::new(arrow_array::StringArray::new_null(cs.row_len)));
935                    continue;
936                };
937                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
938                let seg_range = rel_off..(rel_off + cs.row_len);
939
940                // Check overlay
941                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
942                    || (!ch.computed_overlay.is_empty()
943                        && ch.computed_overlay.any_in_range(seg_range.clone()));
944
945                let base_lowered = ch.text_lower_or_null();
946                let base_seg = base_lowered.slice(rel_off, cs.row_len);
947                let base_sa = base_seg
948                    .as_any()
949                    .downcast_ref::<arrow_array::StringArray>()
950                    .expect("lowered slice downcast");
951
952                if has_overlay {
953                    // Build lowered overlay values builder
954                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
955                    let mut mb = BooleanBuilder::with_capacity(cs.row_len);
956                    for i in 0..cs.row_len {
957                        if let Some(ov) = ch
958                            .overlay
959                            .get(rel_off + i)
960                            .or_else(|| ch.computed_overlay.get(rel_off + i))
961                        {
962                            mb.append_value(true);
963                            match ov {
964                                arrow_store::OverlayValue::Text(s) => {
965                                    sb.append_value(s.to_ascii_lowercase());
966                                }
967                                arrow_store::OverlayValue::Empty => {
968                                    sb.append_null();
969                                }
970                                arrow_store::OverlayValue::Number(n)
971                                | arrow_store::OverlayValue::DateTime(n)
972                                | arrow_store::OverlayValue::Duration(n) => {
973                                    sb.append_value(n.to_string());
974                                }
975                                arrow_store::OverlayValue::Boolean(b) => {
976                                    sb.append_value(if *b { "true" } else { "false" });
977                                }
978                                arrow_store::OverlayValue::Error(_)
979                                | arrow_store::OverlayValue::Pending => {
980                                    sb.append_null();
981                                }
982                            }
983                        } else {
984                            sb.append_null();
985                            mb.append_value(false);
986                        }
987                    }
988                    let overlay_vals = sb.finish();
989                    let mask = mb.finish();
990                    let zipped = zip_select(&mask, &overlay_vals, base_sa)
991                        .expect("zip lowered text overlay");
992                    let za = zipped
993                        .as_any()
994                        .downcast_ref::<arrow_array::StringArray>()
995                        .unwrap()
996                        .clone();
997                    out_cols.push(Arc::new(za));
998                } else {
999                    out_cols.push(Arc::new(base_sa.clone()));
1000                }
1001            }
1002            Ok((cs.row_start, cs.row_len, out_cols))
1003        })
1004    }
1005
1006    /// Typed error-code slices per row-segment.
1007    pub fn errors_slices(
1008        &self,
1009    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::UInt8Array>>), ExcelError>> + '_
1010    {
1011        use crate::compute_prelude::zip_select;
1012        use arrow_array::builder::{BooleanBuilder, UInt8Builder};
1013
1014        self.iter_row_chunks().map(move |res| {
1015            let cs = res?;
1016            let mut out_cols: Vec<Arc<arrow_array::UInt8Array>> = Vec::with_capacity(cs.cols.len());
1017            let sheet = self.sheet();
1018            let chunk_starts = &sheet.chunk_starts;
1019
1020            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1021                let base = cs.cols[local_c]
1022                    .errors
1023                    .as_ref()
1024                    .expect("errors lane exists")
1025                    .clone();
1026                let base_e = base
1027                    .as_any()
1028                    .downcast_ref::<arrow_array::UInt8Array>()
1029                    .unwrap()
1030                    .clone();
1031                let base_arc: Arc<arrow_array::UInt8Array> = Arc::new(base_e);
1032                let abs_seg_start = self.sr + cs.row_start;
1033                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
1034                    Ok(i) => i,
1035                    Err(0) => 0,
1036                    Err(i) => i - 1,
1037                };
1038                if col_idx >= sheet.columns.len() {
1039                    out_cols.push(base_arc);
1040                    continue;
1041                }
1042                let col = &sheet.columns[col_idx];
1043                let Some(ch) = col.chunk(ch_idx) else {
1044                    out_cols.push(base_arc);
1045                    continue;
1046                };
1047                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
1048                let seg_range = rel_off..(rel_off + cs.row_len);
1049                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1050                    || (!ch.computed_overlay.is_empty()
1051                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1052                if has_overlay {
1053                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1054                    let mut eb = UInt8Builder::with_capacity(cs.row_len);
1055                    for i in 0..cs.row_len {
1056                        if let Some(ov) = ch
1057                            .overlay
1058                            .get(rel_off + i)
1059                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1060                        {
1061                            mask_b.append_value(true);
1062                            match ov {
1063                                arrow_store::OverlayValue::Error(code) => eb.append_value(*code),
1064                                _ => eb.append_null(),
1065                            }
1066                        } else {
1067                            mask_b.append_value(false);
1068                            eb.append_null();
1069                        }
1070                    }
1071                    let mask = mask_b.finish();
1072                    let overlay_vals = eb.finish();
1073                    let base_ea = base
1074                        .as_any()
1075                        .downcast_ref::<arrow_array::UInt8Array>()
1076                        .unwrap();
1077                    let zipped =
1078                        zip_select(&mask, &overlay_vals, base_ea).expect("zip err overlay");
1079                    let ea = zipped
1080                        .as_any()
1081                        .downcast_ref::<arrow_array::UInt8Array>()
1082                        .unwrap()
1083                        .clone();
1084                    out_cols.push(Arc::new(ea));
1085                } else {
1086                    out_cols.push(base_arc);
1087                }
1088            }
1089            Ok((cs.row_start, cs.row_len, out_cols))
1090        })
1091    }
1092
1093    /// Typed type-tag slices per row-segment.
1094    pub fn type_tags_slices(
1095        &self,
1096    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::UInt8Array>>), ExcelError>> + '_
1097    {
1098        use crate::compute_prelude::zip_select;
1099        use arrow_array::builder::{BooleanBuilder, UInt8Builder};
1100
1101        self.iter_row_chunks().map(move |res| {
1102            let cs = res?;
1103            let mut out_cols: Vec<Arc<arrow_array::UInt8Array>> = Vec::with_capacity(cs.cols.len());
1104            let sheet = self.sheet();
1105            let chunk_starts = &sheet.chunk_starts;
1106
1107            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1108                let base = cs.cols[local_c].type_tag.clone();
1109                let base_ta = base
1110                    .as_any()
1111                    .downcast_ref::<arrow_array::UInt8Array>()
1112                    .unwrap()
1113                    .clone();
1114                let base_arc = Arc::new(base_ta);
1115
1116                let abs_seg_start = self.sr + cs.row_start;
1117                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
1118                    Ok(i) => i,
1119                    Err(0) => 0,
1120                    Err(i) => i - 1,
1121                };
1122                if col_idx >= sheet.columns.len() {
1123                    out_cols.push(base_arc);
1124                    continue;
1125                }
1126                let col = &sheet.columns[col_idx];
1127                let Some(ch) = col.chunk(ch_idx) else {
1128                    out_cols.push(base_arc);
1129                    continue;
1130                };
1131                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
1132                let seg_range = rel_off..(rel_off + cs.row_len);
1133                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1134                    || (!ch.computed_overlay.is_empty()
1135                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1136                if has_overlay {
1137                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1138                    let mut tb = UInt8Builder::with_capacity(cs.row_len);
1139                    for i in 0..cs.row_len {
1140                        if let Some(ov) = ch
1141                            .overlay
1142                            .get(rel_off + i)
1143                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1144                        {
1145                            mask_b.append_value(true);
1146                            let tag = match ov {
1147                                arrow_store::OverlayValue::Empty => arrow_store::TypeTag::Empty,
1148                                arrow_store::OverlayValue::Number(_) => {
1149                                    arrow_store::TypeTag::Number
1150                                }
1151                                arrow_store::OverlayValue::DateTime(_) => {
1152                                    arrow_store::TypeTag::DateTime
1153                                }
1154                                arrow_store::OverlayValue::Duration(_) => {
1155                                    arrow_store::TypeTag::Duration
1156                                }
1157                                arrow_store::OverlayValue::Boolean(_) => {
1158                                    arrow_store::TypeTag::Boolean
1159                                }
1160                                arrow_store::OverlayValue::Text(_) => arrow_store::TypeTag::Text,
1161                                arrow_store::OverlayValue::Error(_) => arrow_store::TypeTag::Error,
1162                                arrow_store::OverlayValue::Pending => arrow_store::TypeTag::Pending,
1163                            };
1164                            tb.append_value(tag as u8);
1165                        } else {
1166                            mask_b.append_value(false);
1167                            tb.append_null();
1168                        }
1169                    }
1170                    let mask = mask_b.finish();
1171                    let overlay_vals = tb.finish();
1172                    let base_ta = base
1173                        .as_any()
1174                        .downcast_ref::<arrow_array::UInt8Array>()
1175                        .unwrap();
1176                    let zipped =
1177                        zip_select(&mask, &overlay_vals, base_ta).expect("zip tag overlay");
1178                    let ta = zipped
1179                        .as_any()
1180                        .downcast_ref::<arrow_array::UInt8Array>()
1181                        .unwrap()
1182                        .clone();
1183                    out_cols.push(Arc::new(ta));
1184                } else {
1185                    out_cols.push(base_arc);
1186                }
1187            }
1188            Ok((cs.row_start, cs.row_len, out_cols))
1189        })
1190    }
1191
1192    /// Build per-column concatenated lowered text arrays for this view.
1193    /// Uses per-chunk lowered cache for base text and merges overlays via zip_select.
1194    pub fn lowered_text_columns(&self) -> Vec<arrow_array::ArrayRef> {
1195        use crate::compute_prelude::{concat_arrays, zip_select};
1196        use arrow_array::builder::{BooleanBuilder, StringBuilder};
1197
1198        let mut out: Vec<arrow_array::ArrayRef> = Vec::with_capacity(self.cols);
1199        if self.rows == 0 || self.cols == 0 {
1200            return out;
1201        }
1202        let sheet = self.sheet();
1203        let chunk_starts = &sheet.chunk_starts;
1204        // Clamp to physically materialized sheet rows; this view may be logically larger (e.g. A:A).
1205        let sheet_rows = sheet.nrows as usize;
1206        if sheet_rows == 0 || self.sr >= sheet_rows {
1207            for _ in 0..self.cols {
1208                out.push(arrow_array::new_null_array(&DataType::Utf8, 0));
1209            }
1210            return out;
1211        }
1212        let row_end = self.er.min(sheet_rows.saturating_sub(1));
1213        let physical_len = row_end.saturating_sub(self.sr) + 1;
1214        for col_idx in self.sc..=self.ec {
1215            let mut segs: Vec<arrow_array::ArrayRef> = Vec::new();
1216            if col_idx >= sheet.columns.len() {
1217                // OOB: nulls across rows
1218                segs.push(arrow_array::new_null_array(&DataType::Utf8, physical_len));
1219            } else {
1220                let col_ref = &sheet.columns[col_idx];
1221                for (ci, &start) in chunk_starts.iter().enumerate() {
1222                    let chunk_end = chunk_starts
1223                        .get(ci + 1)
1224                        .copied()
1225                        .unwrap_or(sheet.nrows as usize);
1226                    let len = chunk_end.saturating_sub(start);
1227                    if len == 0 {
1228                        continue;
1229                    }
1230                    let end = start + len - 1;
1231                    let is = start.max(self.sr);
1232                    let ie = end.min(row_end);
1233                    if is > ie {
1234                        continue;
1235                    }
1236                    let seg_len = ie - is + 1;
1237                    let rel_off = is - start;
1238                    if let Some(ch) = col_ref.chunk(ci) {
1239                        // Overlay-aware lowered segment
1240                        let has_overlay = ch.overlay.any_in_range(rel_off..(rel_off + seg_len))
1241                            || (!ch.computed_overlay.is_empty()
1242                                && ch
1243                                    .computed_overlay
1244                                    .any_in_range(rel_off..(rel_off + seg_len)));
1245                        if has_overlay {
1246                            // Build lowered overlay values builder
1247                            let mut sb = StringBuilder::with_capacity(seg_len, seg_len * 8);
1248                            // mask overlaid rows
1249                            let mut mb = BooleanBuilder::with_capacity(seg_len);
1250                            for i in 0..seg_len {
1251                                if let Some(ov) = ch
1252                                    .overlay
1253                                    .get(rel_off + i)
1254                                    .or_else(|| ch.computed_overlay.get(rel_off + i))
1255                                {
1256                                    mb.append_value(true);
1257                                    match ov {
1258                                        arrow_store::OverlayValue::Text(s) => {
1259                                            sb.append_value(s.to_ascii_lowercase());
1260                                        }
1261                                        arrow_store::OverlayValue::Empty => {
1262                                            sb.append_null();
1263                                        }
1264                                        arrow_store::OverlayValue::Number(n)
1265                                        | arrow_store::OverlayValue::DateTime(n)
1266                                        | arrow_store::OverlayValue::Duration(n) => {
1267                                            sb.append_value(n.to_string());
1268                                        }
1269                                        arrow_store::OverlayValue::Boolean(b) => {
1270                                            sb.append_value(if *b { "true" } else { "false" });
1271                                        }
1272                                        arrow_store::OverlayValue::Error(_)
1273                                        | arrow_store::OverlayValue::Pending => {
1274                                            sb.append_null();
1275                                        }
1276                                    }
1277                                } else {
1278                                    // not overlaid
1279                                    sb.append_null();
1280                                    mb.append_value(false);
1281                                }
1282                            }
1283                            let overlay_vals = sb.finish();
1284                            let mask = mb.finish();
1285                            // base lowered segment
1286                            let base_lowered = ch.text_lower_or_null();
1287                            let base_seg = base_lowered.slice(rel_off, seg_len);
1288                            let base_sa = base_seg
1289                                .as_any()
1290                                .downcast_ref::<arrow_array::StringArray>()
1291                                .expect("lowered slice downcast");
1292                            let zipped = zip_select(&mask, &overlay_vals, base_sa)
1293                                .expect("zip lowered text overlay");
1294                            segs.push(zipped);
1295                        } else {
1296                            // No overlay: slice from lowered base
1297                            let lowered = ch.text_lower_or_null();
1298                            segs.push(lowered.slice(rel_off, seg_len));
1299                        }
1300                    } else {
1301                        segs.push(arrow_array::new_null_array(&DataType::Utf8, seg_len));
1302                    }
1303                }
1304            }
1305            // Ensure concat has at least one segment (can happen on sparse/empty sheets).
1306            if segs.is_empty() {
1307                segs.push(arrow_array::new_null_array(&DataType::Utf8, physical_len));
1308            }
1309            // Concat segments for this column
1310            let anys: Vec<&dyn arrow_array::Array> = segs
1311                .iter()
1312                .map(|a| a.as_ref() as &dyn arrow_array::Array)
1313                .collect();
1314            let conc = concat_arrays(&anys).expect("concat lowered segments");
1315            out.push(conc);
1316        }
1317        out
1318    }
1319
1320    /// Slice typed float arrays for a specific row interval (relative to view).
1321    pub fn slice_numbers(
1322        &self,
1323        rel_start: usize,
1324        len: usize,
1325    ) -> Vec<Option<Arc<arrow_array::Float64Array>>> {
1326        let abs_start = self.sr + rel_start;
1327        let abs_end = abs_start + len;
1328        let sheet = self.sheet();
1329        let chunk_starts = &sheet.chunk_starts;
1330
1331        let mut out_cols = Vec::with_capacity(self.cols);
1332        for col_idx in self.sc..=self.ec {
1333            if col_idx >= sheet.columns.len() {
1334                out_cols.push(None);
1335                continue;
1336            }
1337            let col = &sheet.columns[col_idx];
1338
1339            let start_ch_idx = match chunk_starts.binary_search(&abs_start) {
1340                Ok(i) => i,
1341                Err(0) => 0,
1342                Err(i) => i - 1,
1343            };
1344
1345            let mut segments: Vec<Arc<arrow_array::Float64Array>> = Vec::new();
1346            let mut null_only = true;
1347
1348            let mut curr = abs_start;
1349            let mut remaining = len;
1350            let mut ch_idx = start_ch_idx;
1351
1352            while remaining > 0 && ch_idx < chunk_starts.len() {
1353                let ch_start = chunk_starts[ch_idx];
1354                let ch_end = chunk_starts
1355                    .get(ch_idx + 1)
1356                    .copied()
1357                    .unwrap_or(sheet.nrows as usize);
1358                let ch_len = ch_end.saturating_sub(ch_start);
1359                if ch_len == 0 {
1360                    ch_idx += 1;
1361                    continue;
1362                }
1363
1364                let overlap_start = curr.max(ch_start);
1365                let overlap_end = ch_end.min(abs_end);
1366
1367                if overlap_start < overlap_end {
1368                    let seg_len = overlap_end - overlap_start;
1369                    let rel_off_in_chunk = overlap_start - ch_start;
1370
1371                    if let Some(ch) = col.chunk(ch_idx) {
1372                        let base_nums_arc = ch.numbers_or_null();
1373                        let base_nums = base_nums_arc.as_ref();
1374
1375                        let seg_range = rel_off_in_chunk..(rel_off_in_chunk + seg_len);
1376                        let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1377                            || (!ch.computed_overlay.is_empty()
1378                                && ch.computed_overlay.any_in_range(seg_range.clone()));
1379
1380                        let final_arr = if has_overlay {
1381                            let mut nb =
1382                                arrow_array::builder::Float64Builder::with_capacity(seg_len);
1383                            let mut mask_b =
1384                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1385                            for i in 0..seg_len {
1386                                if let Some(ov) = ch
1387                                    .overlay
1388                                    .get(rel_off_in_chunk + i)
1389                                    .or_else(|| ch.computed_overlay.get(rel_off_in_chunk + i))
1390                                {
1391                                    mask_b.append_value(true);
1392                                    match ov {
1393                                        arrow_store::OverlayValue::Number(n)
1394                                        | arrow_store::OverlayValue::DateTime(n)
1395                                        | arrow_store::OverlayValue::Duration(n) => {
1396                                            nb.append_value(*n)
1397                                        }
1398                                        _ => nb.append_null(),
1399                                    }
1400                                } else {
1401                                    mask_b.append_value(false);
1402                                    nb.append_null();
1403                                }
1404                            }
1405                            let mask = mask_b.finish();
1406                            let overlay_vals = nb.finish();
1407                            let base_slice = base_nums.slice(rel_off_in_chunk, seg_len);
1408                            let base_fa = base_slice
1409                                .as_any()
1410                                .downcast_ref::<arrow_array::Float64Array>()
1411                                .unwrap();
1412                            let zipped =
1413                                crate::compute_prelude::zip_select(&mask, &overlay_vals, base_fa)
1414                                    .expect("zip slice");
1415                            zipped
1416                                .as_any()
1417                                .downcast_ref::<arrow_array::Float64Array>()
1418                                .unwrap()
1419                                .clone()
1420                        } else {
1421                            let sl = base_nums.slice(rel_off_in_chunk, seg_len);
1422                            sl.as_any()
1423                                .downcast_ref::<arrow_array::Float64Array>()
1424                                .unwrap()
1425                                .clone()
1426                        };
1427
1428                        if final_arr.null_count() < final_arr.len() {
1429                            null_only = false;
1430                        }
1431                        segments.push(Arc::new(final_arr));
1432                    } else {
1433                        segments.push(Arc::new(arrow_array::Float64Array::new_null(seg_len)));
1434                    }
1435                    curr += seg_len;
1436                    remaining -= seg_len;
1437                }
1438                ch_idx += 1;
1439            }
1440
1441            if remaining > 0 {
1442                segments.push(Arc::new(arrow_array::Float64Array::new_null(remaining)));
1443            }
1444
1445            if segments.len() == 1 {
1446                if null_only && segments[0].null_count() == segments[0].len() {
1447                    out_cols.push(None);
1448                } else {
1449                    out_cols.push(Some(segments.pop().unwrap()));
1450                }
1451            } else {
1452                let refs: Vec<&dyn Array> =
1453                    segments.iter().map(|a| a.as_ref() as &dyn Array).collect();
1454                let c = crate::compute_prelude::concat_arrays(&refs).expect("concat slice");
1455                let fa = c
1456                    .as_any()
1457                    .downcast_ref::<arrow_array::Float64Array>()
1458                    .unwrap()
1459                    .clone();
1460                out_cols.push(Some(Arc::new(fa)));
1461            }
1462        }
1463        out_cols
1464    }
1465
1466    /// Slice typed lowered text arrays for a specific row interval (relative to view).
1467    pub fn slice_lowered_text(
1468        &self,
1469        rel_start: usize,
1470        len: usize,
1471    ) -> Vec<Option<Arc<arrow_array::StringArray>>> {
1472        let abs_start = self.sr + rel_start;
1473        let abs_end = abs_start + len;
1474        let sheet = self.sheet();
1475        let chunk_starts = &sheet.chunk_starts;
1476
1477        let mut out_cols = Vec::with_capacity(self.cols);
1478        for col_idx in self.sc..=self.ec {
1479            if col_idx >= sheet.columns.len() {
1480                out_cols.push(None);
1481                continue;
1482            }
1483            let col = &sheet.columns[col_idx];
1484            let start_ch_idx = match chunk_starts.binary_search(&abs_start) {
1485                Ok(i) => i,
1486                Err(0) => 0,
1487                Err(i) => i - 1,
1488            };
1489
1490            let mut segments: Vec<Arc<arrow_array::StringArray>> = Vec::new();
1491            let mut null_only = true;
1492
1493            let mut curr = abs_start;
1494            let mut remaining = len;
1495            let mut ch_idx = start_ch_idx;
1496
1497            while remaining > 0 && ch_idx < chunk_starts.len() {
1498                let ch_start = chunk_starts[ch_idx];
1499                let ch_end = chunk_starts
1500                    .get(ch_idx + 1)
1501                    .copied()
1502                    .unwrap_or(sheet.nrows as usize);
1503                let ch_len = ch_end.saturating_sub(ch_start);
1504                if ch_len == 0 {
1505                    ch_idx += 1;
1506                    continue;
1507                }
1508
1509                let overlap_start = curr.max(ch_start);
1510                let overlap_end = ch_end.min(abs_end);
1511
1512                if overlap_start < overlap_end {
1513                    let seg_len = overlap_end - overlap_start;
1514                    let rel_off_in_chunk = overlap_start - ch_start;
1515
1516                    if let Some(ch) = col.chunk(ch_idx) {
1517                        let base_lowered = ch.text_lower_or_null();
1518                        let seg_range = rel_off_in_chunk..(rel_off_in_chunk + seg_len);
1519                        let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1520                            || (!ch.computed_overlay.is_empty()
1521                                && ch.computed_overlay.any_in_range(seg_range.clone()));
1522
1523                        let final_arr = if has_overlay {
1524                            let mut sb = arrow_array::builder::StringBuilder::with_capacity(
1525                                seg_len,
1526                                seg_len * 8,
1527                            );
1528                            let mut mask_b =
1529                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1530                            for i in 0..seg_len {
1531                                if let Some(ov) = ch
1532                                    .overlay
1533                                    .get(rel_off_in_chunk + i)
1534                                    .or_else(|| ch.computed_overlay.get(rel_off_in_chunk + i))
1535                                {
1536                                    mask_b.append_value(true);
1537                                    match ov {
1538                                        arrow_store::OverlayValue::Text(s) => {
1539                                            sb.append_value(s.to_ascii_lowercase())
1540                                        }
1541                                        arrow_store::OverlayValue::Number(n)
1542                                        | arrow_store::OverlayValue::DateTime(n)
1543                                        | arrow_store::OverlayValue::Duration(n) => {
1544                                            sb.append_value(n.to_string())
1545                                        }
1546                                        arrow_store::OverlayValue::Boolean(b) => {
1547                                            sb.append_value(if *b { "true" } else { "false" })
1548                                        }
1549                                        _ => sb.append_null(),
1550                                    }
1551                                } else {
1552                                    mask_b.append_value(false);
1553                                    sb.append_null();
1554                                }
1555                            }
1556                            let mask = mask_b.finish();
1557                            let overlay_vals = sb.finish();
1558                            let base_slice = base_lowered.slice(rel_off_in_chunk, seg_len);
1559                            let base_sa = base_slice
1560                                .as_any()
1561                                .downcast_ref::<arrow_array::StringArray>()
1562                                .unwrap();
1563                            let zipped =
1564                                crate::compute_prelude::zip_select(&mask, &overlay_vals, base_sa)
1565                                    .expect("zip text");
1566                            zipped
1567                                .as_any()
1568                                .downcast_ref::<arrow_array::StringArray>()
1569                                .unwrap()
1570                                .clone()
1571                        } else {
1572                            let sl = base_lowered.slice(rel_off_in_chunk, seg_len);
1573                            sl.as_any()
1574                                .downcast_ref::<arrow_array::StringArray>()
1575                                .unwrap()
1576                                .clone()
1577                        };
1578
1579                        if final_arr.null_count() < final_arr.len() {
1580                            null_only = false;
1581                        }
1582                        segments.push(Arc::new(final_arr));
1583                    } else {
1584                        segments.push(Arc::new(arrow_array::StringArray::new_null(seg_len)));
1585                    }
1586                    curr += seg_len;
1587                    remaining -= seg_len;
1588                }
1589                ch_idx += 1;
1590            }
1591
1592            if remaining > 0 {
1593                segments.push(Arc::new(arrow_array::StringArray::new_null(remaining)));
1594            }
1595
1596            if segments.len() == 1 {
1597                if null_only && segments[0].null_count() == segments[0].len() {
1598                    out_cols.push(None);
1599                } else {
1600                    out_cols.push(Some(segments.pop().unwrap()));
1601                }
1602            } else {
1603                let refs: Vec<&dyn Array> =
1604                    segments.iter().map(|a| a.as_ref() as &dyn Array).collect();
1605                let c = crate::compute_prelude::concat_arrays(&refs).expect("concat text");
1606                let sa = c
1607                    .as_any()
1608                    .downcast_ref::<arrow_array::StringArray>()
1609                    .unwrap()
1610                    .clone();
1611                out_cols.push(Some(Arc::new(sa)));
1612            }
1613        }
1614        out_cols
1615    }
1616}
1617
1618#[inline]
1619fn pack_numeric(v: &LiteralValue, policy: CoercionPolicy) -> Result<Option<f64>, ExcelError> {
1620    match policy {
1621        CoercionPolicy::NumberLenientText => match v {
1622            LiteralValue::Error(e) => Err(e.clone()),
1623            LiteralValue::Empty => Ok(None),
1624            other => Ok(crate::coercion::to_number_lenient(other).ok()),
1625        },
1626        CoercionPolicy::NumberStrict => match v {
1627            LiteralValue::Error(e) => Err(e.clone()),
1628            LiteralValue::Empty => Ok(None),
1629            other => Ok(crate::coercion::to_number_strict(other).ok()),
1630        },
1631        _ => match v {
1632            LiteralValue::Error(e) => Err(e.clone()),
1633            _ => Ok(None),
1634        },
1635    }
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640    use super::*;
1641
1642    #[test]
1643    fn owned_rows_numeric_chunking() {
1644        let data: Vec<Vec<LiteralValue>> = vec![
1645            vec![
1646                LiteralValue::Number(1.0),
1647                LiteralValue::Text("x".into()),
1648                LiteralValue::Number(3.0),
1649            ],
1650            vec![
1651                LiteralValue::Boolean(true),
1652                LiteralValue::Empty,
1653                LiteralValue::Number(2.5),
1654            ],
1655        ];
1656        let view = RangeView::from_owned_rows(data, DateSystem::Excel1900);
1657        let mut sum = 0.0f64;
1658        view.numbers_chunked(CoercionPolicy::NumberLenientText, 2, &mut |chunk| {
1659            for &n in chunk.data {
1660                sum += n;
1661            }
1662            Ok(())
1663        })
1664        .unwrap();
1665        assert!((sum - 7.5).abs() < 1e-9);
1666    }
1667
1668    #[test]
1669    fn as_1x1_works() {
1670        let view = RangeView::from_owned_rows(
1671            vec![vec![LiteralValue::Number(7.0)]],
1672            DateSystem::Excel1900,
1673        );
1674        assert_eq!(view.as_1x1(), Some(LiteralValue::Number(7.0)));
1675    }
1676}