Skip to main content

formualizer_eval/engine/
range_view.rs

1use crate::arrow_store;
2use crate::arrow_store::IngestBuilder;
3use crate::stripes::NumericChunk;
4use arrow_array::Array;
5use arrow_schema::DataType;
6use formualizer_common::{CoercionPolicy, DateSystem, ExcelError, LiteralValue};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10#[derive(Clone)]
11pub enum RangeBacking<'a> {
12    Borrowed(&'a arrow_store::ArrowSheet),
13    Owned(Arc<arrow_store::ArrowSheet>),
14}
15
16/// Unified view over a 2D range with efficient traversal utilities.
17/// Phase 4: Arrow-only backing.
18#[derive(Clone)]
19pub struct RangeView<'a> {
20    backing: RangeBacking<'a>,
21    sr: usize,
22    sc: usize,
23    er: usize,
24    ec: usize,
25    rows: usize,
26    cols: usize,
27    cancel_token: Option<Arc<AtomicBool>>,
28}
29
30impl<'a> core::fmt::Debug for RangeView<'a> {
31    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
32        f.debug_struct("RangeView")
33            .field("rows", &self.rows)
34            .field("cols", &self.cols)
35            .field("kind", &self.kind_probe())
36            .finish()
37    }
38}
39
40#[derive(Copy, Clone, Debug, Eq, PartialEq)]
41pub enum RangeKind {
42    Empty,
43    NumericOnly,
44    TextOnly,
45    Mixed,
46}
47
48pub struct ChunkCol {
49    pub numbers: Option<arrow_array::ArrayRef>,
50    pub booleans: Option<arrow_array::ArrayRef>,
51    pub text: Option<arrow_array::ArrayRef>,
52    pub errors: Option<arrow_array::ArrayRef>,
53    pub type_tag: arrow_array::ArrayRef,
54}
55
56pub struct ChunkSlice {
57    pub row_start: usize, // relative to view top
58    pub row_len: usize,
59    pub cols: Vec<ChunkCol>,
60}
61
62pub struct RowChunkIterator<'a> {
63    view: &'a RangeView<'a>,
64    current_chunk_idx: usize,
65}
66
67impl<'a> Iterator for RowChunkIterator<'a> {
68    type Item = Result<ChunkSlice, ExcelError>;
69
70    fn next(&mut self) -> Option<Self::Item> {
71        if let Some(token) = &self.view.cancel_token
72            && token.load(Ordering::Relaxed)
73        {
74            return Some(Err(ExcelError::new(
75                formualizer_common::ExcelErrorKind::Cancelled,
76            )));
77        }
78
79        let sheet = self.view.sheet();
80        let chunk_starts = &sheet.chunk_starts;
81        let sheet_rows = sheet.nrows as usize;
82        let row_end = self.view.er.min(sheet_rows.saturating_sub(1));
83
84        while self.current_chunk_idx < chunk_starts.len() {
85            let ci = self.current_chunk_idx;
86            let start = chunk_starts[ci];
87            self.current_chunk_idx += 1;
88
89            let end = if ci + 1 < chunk_starts.len() {
90                chunk_starts[ci + 1]
91            } else {
92                sheet_rows
93            };
94            let len = end.saturating_sub(start);
95            if len == 0 {
96                continue;
97            }
98            let chunk_end_abs = start + len - 1;
99            let is = start.max(self.view.sr);
100            let ie = chunk_end_abs.min(row_end);
101            if is > ie {
102                continue;
103            }
104            let seg_len = ie - is + 1;
105            let rel_off = is - start;
106
107            let mut cols = Vec::with_capacity(self.view.cols);
108            for col_idx in self.view.sc..=self.view.ec {
109                if col_idx >= sheet.columns.len() {
110                    let numbers = Some(arrow_array::new_null_array(&DataType::Float64, seg_len));
111                    let booleans = Some(arrow_array::new_null_array(&DataType::Boolean, seg_len));
112                    let text = Some(arrow_array::new_null_array(&DataType::Utf8, seg_len));
113                    let errors = Some(arrow_array::new_null_array(&DataType::UInt8, seg_len));
114                    let type_tag: arrow_array::ArrayRef =
115                        Arc::new(arrow_array::UInt8Array::from(vec![
116                            arrow_store::TypeTag::Empty
117                                as u8;
118                            seg_len
119                        ]));
120                    cols.push(ChunkCol {
121                        numbers,
122                        booleans,
123                        text,
124                        errors,
125                        type_tag,
126                    });
127                } else {
128                    let col = &sheet.columns[col_idx];
129                    let Some(ch) = col.chunk(ci) else {
130                        let numbers =
131                            Some(arrow_array::new_null_array(&DataType::Float64, seg_len));
132                        let booleans =
133                            Some(arrow_array::new_null_array(&DataType::Boolean, seg_len));
134                        let text = Some(arrow_array::new_null_array(&DataType::Utf8, seg_len));
135                        let errors = Some(arrow_array::new_null_array(&DataType::UInt8, seg_len));
136                        let type_tag: arrow_array::ArrayRef =
137                            Arc::new(arrow_array::UInt8Array::from(vec![
138                                arrow_store::TypeTag::Empty
139                                    as u8;
140                                seg_len
141                            ]));
142                        cols.push(ChunkCol {
143                            numbers,
144                            booleans,
145                            text,
146                            errors,
147                            type_tag,
148                        });
149                        continue;
150                    };
151
152                    let numbers_base: arrow_array::ArrayRef = ch.numbers_or_null();
153                    let booleans_base: arrow_array::ArrayRef = ch.booleans_or_null();
154                    let text_base: arrow_array::ArrayRef = ch.text_or_null();
155                    let errors_base: arrow_array::ArrayRef = ch.errors_or_null();
156
157                    let numbers = Some(numbers_base.slice(rel_off, seg_len));
158                    let booleans = Some(booleans_base.slice(rel_off, seg_len));
159                    let text = Some(text_base.slice(rel_off, seg_len));
160                    let errors = Some(errors_base.slice(rel_off, seg_len));
161                    let type_tag: arrow_array::ArrayRef =
162                        Arc::new(ch.type_tag.slice(rel_off, seg_len));
163                    cols.push(ChunkCol {
164                        numbers,
165                        booleans,
166                        text,
167                        errors,
168                        type_tag,
169                    });
170                }
171            }
172            return Some(Ok(ChunkSlice {
173                row_start: is - self.view.sr,
174                row_len: seg_len,
175                cols,
176            }));
177        }
178        None
179    }
180}
181
182impl<'a> RangeView<'a> {
183    pub(crate) fn new(
184        backing: RangeBacking<'a>,
185        sr: usize,
186        sc: usize,
187        er: usize,
188        ec: usize,
189        rows: usize,
190        cols: usize,
191    ) -> Self {
192        Self {
193            backing,
194            sr,
195            sc,
196            er,
197            ec,
198            rows,
199            cols,
200            cancel_token: None,
201        }
202    }
203
204    #[must_use]
205    pub fn with_cancel_token(mut self, token: Option<Arc<AtomicBool>>) -> Self {
206        self.cancel_token = token;
207        self
208    }
209
210    #[inline]
211    pub fn sheet(&self) -> &arrow_store::ArrowSheet {
212        match &self.backing {
213            RangeBacking::Borrowed(s) => s,
214            RangeBacking::Owned(s) => s,
215        }
216    }
217
218    pub fn from_owned_rows(
219        rows: Vec<Vec<LiteralValue>>,
220        date_system: DateSystem,
221    ) -> RangeView<'static> {
222        let nrows = rows.len();
223        let ncols = rows.iter().map(|r| r.len()).max().unwrap_or(0);
224
225        let chunk_rows = 32 * 1024;
226        let mut ib = IngestBuilder::new("__tmp", ncols, chunk_rows, date_system);
227
228        for mut r in rows {
229            r.resize(ncols, LiteralValue::Empty);
230            ib.append_row(&r).expect("append_row for RangeView");
231        }
232
233        let sheet = Arc::new(ib.finish());
234
235        if nrows == 0 || ncols == 0 {
236            return RangeView {
237                backing: RangeBacking::Owned(sheet),
238                sr: 1,
239                sc: 1,
240                er: 0,
241                ec: 0,
242                rows: 0,
243                cols: 0,
244                cancel_token: None,
245            };
246        }
247
248        RangeView {
249            backing: RangeBacking::Owned(sheet),
250            sr: 0,
251            sc: 0,
252            er: nrows - 1,
253            ec: ncols - 1,
254            rows: nrows,
255            cols: ncols,
256            cancel_token: None,
257        }
258    }
259
260    pub fn dims(&self) -> (usize, usize) {
261        (self.rows, self.cols)
262    }
263
264    pub fn expand_to(&self, rows: usize, cols: usize) -> RangeView<'a> {
265        let er = self.sr + rows.saturating_sub(1);
266        let ec = self.sc + cols.saturating_sub(1);
267        RangeView {
268            backing: match &self.backing {
269                RangeBacking::Borrowed(s) => RangeBacking::Borrowed(s),
270                RangeBacking::Owned(s) => RangeBacking::Owned(s.clone()),
271            },
272            sr: self.sr,
273            sc: self.sc,
274            er,
275            ec,
276            rows,
277            cols,
278            cancel_token: self.cancel_token.clone(),
279        }
280    }
281
282    pub fn sub_view(&self, rs: usize, cs: usize, rows: usize, cols: usize) -> RangeView<'a> {
283        let abs_sr = self.sr + rs;
284        let abs_sc = self.sc + cs;
285        let er = abs_sr + rows.saturating_sub(1);
286        let ec = abs_sc + cols.saturating_sub(1);
287        RangeView {
288            backing: match &self.backing {
289                RangeBacking::Borrowed(s) => RangeBacking::Borrowed(s),
290                RangeBacking::Owned(s) => RangeBacking::Owned(s.clone()),
291            },
292            sr: abs_sr,
293            sc: abs_sc,
294            er,
295            ec,
296            rows,
297            cols,
298            cancel_token: self.cancel_token.clone(),
299        }
300    }
301
302    #[inline]
303    pub fn is_empty(&self) -> bool {
304        self.rows == 0 || self.cols == 0
305    }
306
307    /// Absolute 0-based start row of this view.
308    pub fn start_row(&self) -> usize {
309        self.sr
310    }
311    /// Absolute 0-based end row of this view (inclusive).
312    pub fn end_row(&self) -> usize {
313        self.er
314    }
315    /// Absolute 0-based start column of this view.
316    pub fn start_col(&self) -> usize {
317        self.sc
318    }
319    /// Absolute 0-based end column of this view (inclusive).
320    pub fn end_col(&self) -> usize {
321        self.ec
322    }
323    /// Owning sheet name.
324    pub fn sheet_name(&self) -> &str {
325        &self.sheet().name
326    }
327
328    pub fn kind_probe(&self) -> RangeKind {
329        if self.is_empty() {
330            return RangeKind::Empty;
331        }
332
333        let mut has_num = false;
334        let mut has_text = false;
335
336        for r in 0..self.rows {
337            for c in 0..self.cols {
338                match self.get_cell(r, c) {
339                    LiteralValue::Empty => {}
340                    LiteralValue::Number(_) | LiteralValue::Int(_) => has_num = true,
341                    LiteralValue::Text(_) => has_text = true,
342                    _ => return RangeKind::Mixed,
343                }
344                if has_num && has_text {
345                    return RangeKind::Mixed;
346                }
347            }
348        }
349
350        match (has_num, has_text) {
351            (false, false) => RangeKind::Empty,
352            (true, false) => RangeKind::NumericOnly,
353            (false, true) => RangeKind::TextOnly,
354            (true, true) => RangeKind::Mixed,
355        }
356    }
357
358    pub fn as_1x1(&self) -> Option<LiteralValue> {
359        if self.rows == 1 && self.cols == 1 {
360            Some(self.get_cell(0, 0))
361        } else {
362            None
363        }
364    }
365
366    /// Get a specific cell by row and column index (0-based).
367    /// Returns Empty for out-of-bounds access.
368    pub fn get_cell(&self, row: usize, col: usize) -> LiteralValue {
369        if row >= self.rows || col >= self.cols {
370            return LiteralValue::Empty;
371        }
372        let abs_row = self.sr + row;
373        let abs_col = self.sc + col;
374        let sheet = self.sheet();
375        let sheet_rows = sheet.nrows as usize;
376        if abs_row >= sheet_rows {
377            return LiteralValue::Empty;
378        }
379        if abs_col >= sheet.columns.len() {
380            return LiteralValue::Empty;
381        }
382        let col_ref = &sheet.columns[abs_col];
383        // Locate chunk by binary searching start offsets
384        let chunk_starts = &sheet.chunk_starts;
385        let ch_idx = match chunk_starts.binary_search(&abs_row) {
386            Ok(i) => i,
387            Err(0) => 0,
388            Err(i) => i - 1,
389        };
390        let Some(ch) = col_ref.chunk(ch_idx) else {
391            return LiteralValue::Empty;
392        };
393        let row_start = chunk_starts[ch_idx];
394        let in_off = abs_row - row_start;
395        // Overlay takes precedence: user edits over computed over base.
396        if let Some(ov) = ch
397            .overlay
398            .get(in_off)
399            .or_else(|| ch.computed_overlay.get(in_off))
400        {
401            return match ov {
402                arrow_store::OverlayValue::Empty => LiteralValue::Empty,
403                arrow_store::OverlayValue::Number(n) => LiteralValue::Number(*n),
404                arrow_store::OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
405                arrow_store::OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
406                arrow_store::OverlayValue::Error(code) => {
407                    let kind = arrow_store::unmap_error_code(*code);
408                    LiteralValue::Error(ExcelError::new(kind))
409                }
410                arrow_store::OverlayValue::Pending => LiteralValue::Pending,
411            };
412        }
413        // Read tag and route to lane
414        let tag_u8 = ch.type_tag.value(in_off);
415        match arrow_store::TypeTag::from_u8(tag_u8) {
416            arrow_store::TypeTag::Empty => LiteralValue::Empty,
417            arrow_store::TypeTag::Number => {
418                if let Some(arr) = &ch.numbers {
419                    if arr.is_null(in_off) {
420                        return LiteralValue::Empty;
421                    }
422                    LiteralValue::Number(arr.value(in_off))
423                } else {
424                    LiteralValue::Empty
425                }
426            }
427            arrow_store::TypeTag::DateTime | arrow_store::TypeTag::Duration => {
428                if let Some(arr) = &ch.numbers {
429                    if arr.is_null(in_off) {
430                        return LiteralValue::Empty;
431                    }
432                    LiteralValue::from_serial_number(arr.value(in_off))
433                } else {
434                    LiteralValue::Empty
435                }
436            }
437            arrow_store::TypeTag::Boolean => {
438                if let Some(arr) = &ch.booleans {
439                    if arr.is_null(in_off) {
440                        return LiteralValue::Empty;
441                    }
442                    LiteralValue::Boolean(arr.value(in_off))
443                } else {
444                    LiteralValue::Empty
445                }
446            }
447            arrow_store::TypeTag::Text => {
448                if let Some(arr) = &ch.text {
449                    if arr.is_null(in_off) {
450                        return LiteralValue::Empty;
451                    }
452                    let sa = arr
453                        .as_any()
454                        .downcast_ref::<arrow_array::StringArray>()
455                        .unwrap();
456                    LiteralValue::Text(sa.value(in_off).to_string())
457                } else {
458                    LiteralValue::Empty
459                }
460            }
461            arrow_store::TypeTag::Error => {
462                if let Some(arr) = &ch.errors {
463                    if arr.is_null(in_off) {
464                        return LiteralValue::Empty;
465                    }
466                    let kind = arrow_store::unmap_error_code(arr.value(in_off));
467                    LiteralValue::Error(ExcelError::new(kind))
468                } else {
469                    LiteralValue::Empty
470                }
471            }
472            arrow_store::TypeTag::Pending => LiteralValue::Pending,
473        }
474    }
475
476    /// Iterate overlapping chunks by row segment.
477    pub fn iter_row_chunks(&self) -> RowChunkIterator<'_> {
478        RowChunkIterator {
479            view: self,
480            current_chunk_idx: 0,
481        }
482    }
483
484    /// Row-major cell traversal.
485    pub fn for_each_cell(
486        &self,
487        f: &mut dyn FnMut(&LiteralValue) -> Result<(), ExcelError>,
488    ) -> Result<(), ExcelError> {
489        for res in self.iter_row_chunks() {
490            let cs = res?;
491            for r in 0..cs.row_len {
492                for c in 0..self.cols {
493                    let tmp = self.get_cell(cs.row_start + r, c);
494                    f(&tmp)?;
495                }
496            }
497        }
498        Ok(())
499    }
500
501    /// Visit each row as a borrowed slice (buffered).
502    pub fn for_each_row(
503        &self,
504        f: &mut dyn FnMut(&[LiteralValue]) -> Result<(), ExcelError>,
505    ) -> Result<(), ExcelError> {
506        let mut buf: Vec<LiteralValue> = Vec::with_capacity(self.cols);
507        for r in 0..self.rows {
508            buf.clear();
509            for c in 0..self.cols {
510                buf.push(self.get_cell(r, c));
511            }
512            f(&buf[..])?;
513        }
514        Ok(())
515    }
516
517    /// Visit each column as a contiguous slice (buffered).
518    pub fn for_each_col(
519        &self,
520        f: &mut dyn FnMut(&[LiteralValue]) -> Result<(), ExcelError>,
521    ) -> Result<(), ExcelError> {
522        let mut col_buf: Vec<LiteralValue> = Vec::with_capacity(self.rows);
523        for c in 0..self.cols {
524            col_buf.clear();
525            for r in 0..self.rows {
526                col_buf.push(self.get_cell(r, c));
527            }
528            f(&col_buf[..])?;
529        }
530        Ok(())
531    }
532
533    /// Get a numeric value at a specific cell, with coercion.
534    /// Returns None for empty cells or non-coercible values.
535    pub fn get_cell_numeric(&self, row: usize, col: usize, policy: CoercionPolicy) -> Option<f64> {
536        if row >= self.rows || col >= self.cols {
537            return None;
538        }
539
540        let val = self.get_cell(row, col);
541        pack_numeric(&val, policy).ok().flatten()
542    }
543
544    /// Numeric chunk iteration with coercion policy.
545    pub fn numbers_chunked(
546        &self,
547        policy: CoercionPolicy,
548        min_chunk: usize,
549        f: &mut dyn FnMut(NumericChunk) -> Result<(), ExcelError>,
550    ) -> Result<(), ExcelError> {
551        // Fast path for Arrow numbers lane when policy allows ignoring non-numeric cells in ranges (standard Excel behavior for SUM/AVERAGE/etc over ranges)
552        if matches!(policy, CoercionPolicy::NumberStrict) {
553            for res in self.numbers_slices() {
554                let (_, _, cols) = res?;
555                for col in cols {
556                    if col.null_count() < col.len() {
557                        let data = col.values();
558                        // If there are nulls, we need to handle them.
559                        // Currently NumericChunk doesn't have a perfect way to represent sparse Arrow slices
560                        // without copying if we want a contiguous f64 slice.
561                        // For now, we can just provide the raw data and the validity mask if it exists.
562
563                        let validity = if col.null_count() > 0 {
564                            // Extract validity mask.
565                            // Note: This is still slightly awkward with the current NumericChunk design.
566                            None // TODO: Implement validity mask propagation
567                        } else {
568                            None
569                        };
570
571                        if col.null_count() == 0 {
572                            f(NumericChunk { data, validity })?;
573                        } else {
574                            // Fallback for nulls: iterate and push to a small buffer
575                            let mut buf = Vec::with_capacity(col.len());
576                            for i in 0..col.len() {
577                                if !col.is_null(i) {
578                                    buf.push(col.value(i));
579                                }
580                            }
581                            if !buf.is_empty() {
582                                f(NumericChunk {
583                                    data: &buf,
584                                    validity: None,
585                                })?;
586                            }
587                        }
588                    }
589                }
590            }
591            return Ok(());
592        }
593
594        let min_chunk = min_chunk.max(1);
595        let mut buf: Vec<f64> = Vec::with_capacity(min_chunk);
596        let mut flush = |buf: &mut Vec<f64>| -> Result<(), ExcelError> {
597            if buf.is_empty() {
598                return Ok(());
599            }
600            // SAFETY: read-only borrow for callback duration
601            let ptr = buf.as_ptr();
602            let len = buf.len();
603            let slice = unsafe { std::slice::from_raw_parts(ptr, len) };
604            let chunk = NumericChunk {
605                data: slice,
606                validity: None,
607            };
608            f(chunk)?;
609            buf.clear();
610            Ok(())
611        };
612
613        self.for_each_cell(&mut |v| {
614            if let Some(n) = pack_numeric(v, policy)? {
615                buf.push(n);
616                if buf.len() >= min_chunk {
617                    flush(&mut buf)?;
618                }
619            }
620            Ok(())
621        })?;
622        flush(&mut buf)?;
623
624        Ok(())
625    }
626
627    /// Typed numeric slices per row-segment: (row_start, row_len, per-column Float64 arrays)
628    pub fn numbers_slices(
629        &self,
630    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::Float64Array>>), ExcelError>> + '_
631    {
632        use crate::compute_prelude::zip_select;
633        use arrow_array::builder::{BooleanBuilder, Float64Builder};
634
635        self.iter_row_chunks().map(move |res| {
636            let cs = res?;
637            let mut out_cols: Vec<Arc<arrow_array::Float64Array>> =
638                Vec::with_capacity(cs.cols.len());
639            let sheet = self.sheet();
640            let chunk_starts = &sheet.chunk_starts;
641
642            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
643                let base = cs.cols[local_c]
644                    .numbers
645                    .as_ref()
646                    .expect("numbers lane exists")
647                    .clone();
648                let base_fa = base
649                    .as_any()
650                    .downcast_ref::<arrow_array::Float64Array>()
651                    .unwrap()
652                    .clone();
653                let base_arc = Arc::new(base_fa);
654
655                // Identify chunk and overlay segment
656                let abs_seg_start = self.sr + cs.row_start;
657                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
658                    Ok(i) => i,
659                    Err(0) => 0,
660                    Err(i) => i - 1,
661                };
662                if col_idx >= sheet.columns.len() {
663                    out_cols.push(base_arc);
664                    continue;
665                }
666                let col = &sheet.columns[col_idx];
667                let Some(ch) = col.chunk(ch_idx) else {
668                    out_cols.push(base_arc);
669                    continue;
670                };
671                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
672                let seg_range = rel_off..(rel_off + cs.row_len);
673                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
674                    || (!ch.computed_overlay.is_empty()
675                        && ch.computed_overlay.any_in_range(seg_range.clone()));
676                if has_overlay {
677                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
678                    let mut ob = Float64Builder::with_capacity(cs.row_len);
679                    for i in 0..cs.row_len {
680                        if let Some(ov) = ch
681                            .overlay
682                            .get(rel_off + i)
683                            .or_else(|| ch.computed_overlay.get(rel_off + i))
684                        {
685                            mask_b.append_value(true);
686                            match ov {
687                                arrow_store::OverlayValue::Number(n) => ob.append_value(*n),
688                                _ => ob.append_null(),
689                            }
690                        } else {
691                            mask_b.append_value(false);
692                            ob.append_null();
693                        }
694                    }
695                    let mask = mask_b.finish();
696                    let overlay_vals = ob.finish();
697                    let base_fa = base
698                        .as_any()
699                        .downcast_ref::<arrow_array::Float64Array>()
700                        .unwrap();
701                    let zipped = zip_select(&mask, &overlay_vals, base_fa).expect("zip overlay");
702                    let fa = zipped
703                        .as_any()
704                        .downcast_ref::<arrow_array::Float64Array>()
705                        .unwrap()
706                        .clone();
707                    out_cols.push(Arc::new(fa));
708                } else {
709                    out_cols.push(base_arc);
710                }
711            }
712            Ok((cs.row_start, cs.row_len, out_cols))
713        })
714    }
715
716    /// Typed boolean slices per row-segment, overlay-aware via zip.
717    pub fn booleans_slices(
718        &self,
719    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::BooleanArray>>), ExcelError>> + '_
720    {
721        use crate::compute_prelude::zip_select;
722        use arrow_array::builder::BooleanBuilder;
723
724        self.iter_row_chunks().map(move |res| {
725            let cs = res?;
726            let mut out_cols: Vec<Arc<arrow_array::BooleanArray>> =
727                Vec::with_capacity(cs.cols.len());
728            let sheet = self.sheet();
729            let chunk_starts = &sheet.chunk_starts;
730
731            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
732                let base = cs.cols[local_c]
733                    .booleans
734                    .as_ref()
735                    .expect("booleans lane exists")
736                    .clone();
737                let base_ba = base
738                    .as_any()
739                    .downcast_ref::<arrow_array::BooleanArray>()
740                    .unwrap()
741                    .clone();
742                let base_arc = Arc::new(base_ba);
743
744                // Identify chunk and overlay segment
745                let abs_seg_start = self.sr + cs.row_start;
746                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
747                    Ok(i) => i,
748                    Err(0) => 0,
749                    Err(i) => i - 1,
750                };
751                if col_idx >= sheet.columns.len() {
752                    out_cols.push(base_arc);
753                    continue;
754                }
755                let col = &sheet.columns[col_idx];
756                let Some(ch) = col.chunk(ch_idx) else {
757                    out_cols.push(base_arc);
758                    continue;
759                };
760                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
761                let seg_range = rel_off..(rel_off + cs.row_len);
762                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
763                    || (!ch.computed_overlay.is_empty()
764                        && ch.computed_overlay.any_in_range(seg_range.clone()));
765                if has_overlay {
766                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
767                    let mut bb = BooleanBuilder::with_capacity(cs.row_len);
768                    for i in 0..cs.row_len {
769                        if let Some(ov) = ch
770                            .overlay
771                            .get(rel_off + i)
772                            .or_else(|| ch.computed_overlay.get(rel_off + i))
773                        {
774                            mask_b.append_value(true);
775                            match ov {
776                                arrow_store::OverlayValue::Boolean(b) => bb.append_value(*b),
777                                _ => bb.append_null(),
778                            }
779                        } else {
780                            mask_b.append_value(false);
781                            bb.append_null();
782                        }
783                    }
784                    let mask = mask_b.finish();
785                    let overlay_vals = bb.finish();
786                    let base_ba = base
787                        .as_any()
788                        .downcast_ref::<arrow_array::BooleanArray>()
789                        .unwrap();
790                    let zipped =
791                        zip_select(&mask, &overlay_vals, base_ba).expect("zip boolean overlay");
792                    let ba = zipped
793                        .as_any()
794                        .downcast_ref::<arrow_array::BooleanArray>()
795                        .unwrap()
796                        .clone();
797                    out_cols.push(Arc::new(ba));
798                } else {
799                    out_cols.push(base_arc);
800                }
801            }
802            Ok((cs.row_start, cs.row_len, out_cols))
803        })
804    }
805
806    /// Text slices per row-segment (erased as ArrayRef for Utf8 today; future Dict/View support).
807    pub fn text_slices(
808        &self,
809    ) -> impl Iterator<Item = Result<(usize, usize, Vec<arrow_array::ArrayRef>), ExcelError>> + '_
810    {
811        use crate::compute_prelude::zip_select;
812        use arrow_array::builder::{BooleanBuilder, StringBuilder};
813
814        self.iter_row_chunks().map(move |res| {
815            let cs = res?;
816            let mut out_cols: Vec<arrow_array::ArrayRef> = Vec::with_capacity(cs.cols.len());
817            let sheet = self.sheet();
818            let chunk_starts = &sheet.chunk_starts;
819
820            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
821                let base = cs.cols[local_c]
822                    .text
823                    .as_ref()
824                    .expect("text lane exists")
825                    .clone();
826                let abs_seg_start = self.sr + cs.row_start;
827                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
828                    Ok(i) => i,
829                    Err(0) => 0,
830                    Err(i) => i - 1,
831                };
832                if col_idx >= sheet.columns.len() {
833                    out_cols.push(base.clone());
834                    continue;
835                }
836                let col = &sheet.columns[col_idx];
837                let Some(ch) = col.chunk(ch_idx) else {
838                    out_cols.push(base.clone());
839                    continue;
840                };
841                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
842                let seg_range = rel_off..(rel_off + cs.row_len);
843                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
844                    || (!ch.computed_overlay.is_empty()
845                        && ch.computed_overlay.any_in_range(seg_range.clone()));
846                if has_overlay {
847                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
848                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
849                    for i in 0..cs.row_len {
850                        if let Some(ov) = ch
851                            .overlay
852                            .get(rel_off + i)
853                            .or_else(|| ch.computed_overlay.get(rel_off + i))
854                        {
855                            mask_b.append_value(true);
856                            match ov {
857                                arrow_store::OverlayValue::Text(s) => sb.append_value(s),
858                                _ => sb.append_null(),
859                            }
860                        } else {
861                            mask_b.append_value(false);
862                            sb.append_null();
863                        }
864                    }
865                    let mask = mask_b.finish();
866                    let overlay_vals = sb.finish();
867                    let base_sa = base
868                        .as_any()
869                        .downcast_ref::<arrow_array::StringArray>()
870                        .unwrap();
871                    let zipped =
872                        zip_select(&mask, &overlay_vals, base_sa).expect("zip text overlay");
873                    out_cols.push(zipped);
874                } else {
875                    out_cols.push(base.clone());
876                }
877            }
878            Ok((cs.row_start, cs.row_len, out_cols))
879        })
880    }
881
882    /// Typed lowered text slices per row-segment, overlay-aware via zip.
883    pub fn lowered_text_slices(
884        &self,
885    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::StringArray>>), ExcelError>> + '_
886    {
887        use crate::compute_prelude::zip_select;
888        use arrow_array::builder::{BooleanBuilder, StringBuilder};
889
890        self.iter_row_chunks().map(move |res| {
891            let cs = res?;
892            let mut out_cols: Vec<Arc<arrow_array::StringArray>> =
893                Vec::with_capacity(cs.cols.len());
894            let sheet = self.sheet();
895            let chunk_starts = &sheet.chunk_starts;
896
897            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
898                // Identify chunk
899                let abs_seg_start = self.sr + cs.row_start;
900                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
901                    Ok(i) => i,
902                    Err(0) => 0,
903                    Err(i) => i - 1,
904                };
905                if col_idx >= sheet.columns.len() {
906                    out_cols.push(Arc::new(arrow_array::StringArray::new_null(cs.row_len)));
907                    continue;
908                }
909                let col = &sheet.columns[col_idx];
910                let Some(ch) = col.chunk(ch_idx) else {
911                    out_cols.push(Arc::new(arrow_array::StringArray::new_null(cs.row_len)));
912                    continue;
913                };
914                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
915                let seg_range = rel_off..(rel_off + cs.row_len);
916
917                // Check overlay
918                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
919                    || (!ch.computed_overlay.is_empty()
920                        && ch.computed_overlay.any_in_range(seg_range.clone()));
921
922                let base_lowered = ch.text_lower_or_null();
923                let base_seg = base_lowered.slice(rel_off, cs.row_len);
924                let base_sa = base_seg
925                    .as_any()
926                    .downcast_ref::<arrow_array::StringArray>()
927                    .expect("lowered slice downcast");
928
929                if has_overlay {
930                    // Build lowered overlay values builder
931                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
932                    let mut mb = BooleanBuilder::with_capacity(cs.row_len);
933                    for i in 0..cs.row_len {
934                        if let Some(ov) = ch
935                            .overlay
936                            .get(rel_off + i)
937                            .or_else(|| ch.computed_overlay.get(rel_off + i))
938                        {
939                            mb.append_value(true);
940                            match ov {
941                                arrow_store::OverlayValue::Text(s) => {
942                                    sb.append_value(s.to_ascii_lowercase());
943                                }
944                                arrow_store::OverlayValue::Empty => {
945                                    sb.append_null();
946                                }
947                                arrow_store::OverlayValue::Number(n) => {
948                                    sb.append_value(n.to_string());
949                                }
950                                arrow_store::OverlayValue::Boolean(b) => {
951                                    sb.append_value(if *b { "true" } else { "false" });
952                                }
953                                arrow_store::OverlayValue::Error(_)
954                                | arrow_store::OverlayValue::Pending => {
955                                    sb.append_null();
956                                }
957                            }
958                        } else {
959                            sb.append_null();
960                            mb.append_value(false);
961                        }
962                    }
963                    let overlay_vals = sb.finish();
964                    let mask = mb.finish();
965                    let zipped = zip_select(&mask, &overlay_vals, base_sa)
966                        .expect("zip lowered text overlay");
967                    let za = zipped
968                        .as_any()
969                        .downcast_ref::<arrow_array::StringArray>()
970                        .unwrap()
971                        .clone();
972                    out_cols.push(Arc::new(za));
973                } else {
974                    out_cols.push(Arc::new(base_sa.clone()));
975                }
976            }
977            Ok((cs.row_start, cs.row_len, out_cols))
978        })
979    }
980
981    /// Typed error-code slices per row-segment.
982    pub fn errors_slices(
983        &self,
984    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::UInt8Array>>), ExcelError>> + '_
985    {
986        use crate::compute_prelude::zip_select;
987        use arrow_array::builder::{BooleanBuilder, UInt8Builder};
988
989        self.iter_row_chunks().map(move |res| {
990            let cs = res?;
991            let mut out_cols: Vec<Arc<arrow_array::UInt8Array>> = Vec::with_capacity(cs.cols.len());
992            let sheet = self.sheet();
993            let chunk_starts = &sheet.chunk_starts;
994
995            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
996                let base = cs.cols[local_c]
997                    .errors
998                    .as_ref()
999                    .expect("errors lane exists")
1000                    .clone();
1001                let base_e = base
1002                    .as_any()
1003                    .downcast_ref::<arrow_array::UInt8Array>()
1004                    .unwrap()
1005                    .clone();
1006                let base_arc: Arc<arrow_array::UInt8Array> = Arc::new(base_e);
1007                let abs_seg_start = self.sr + cs.row_start;
1008                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
1009                    Ok(i) => i,
1010                    Err(0) => 0,
1011                    Err(i) => i - 1,
1012                };
1013                if col_idx >= sheet.columns.len() {
1014                    out_cols.push(base_arc);
1015                    continue;
1016                }
1017                let col = &sheet.columns[col_idx];
1018                let Some(ch) = col.chunk(ch_idx) else {
1019                    out_cols.push(base_arc);
1020                    continue;
1021                };
1022                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
1023                let seg_range = rel_off..(rel_off + cs.row_len);
1024                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1025                    || (!ch.computed_overlay.is_empty()
1026                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1027                if has_overlay {
1028                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1029                    let mut eb = UInt8Builder::with_capacity(cs.row_len);
1030                    for i in 0..cs.row_len {
1031                        if let Some(ov) = ch
1032                            .overlay
1033                            .get(rel_off + i)
1034                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1035                        {
1036                            mask_b.append_value(true);
1037                            match ov {
1038                                arrow_store::OverlayValue::Error(code) => eb.append_value(*code),
1039                                _ => eb.append_null(),
1040                            }
1041                        } else {
1042                            mask_b.append_value(false);
1043                            eb.append_null();
1044                        }
1045                    }
1046                    let mask = mask_b.finish();
1047                    let overlay_vals = eb.finish();
1048                    let base_ea = base
1049                        .as_any()
1050                        .downcast_ref::<arrow_array::UInt8Array>()
1051                        .unwrap();
1052                    let zipped =
1053                        zip_select(&mask, &overlay_vals, base_ea).expect("zip err overlay");
1054                    let ea = zipped
1055                        .as_any()
1056                        .downcast_ref::<arrow_array::UInt8Array>()
1057                        .unwrap()
1058                        .clone();
1059                    out_cols.push(Arc::new(ea));
1060                } else {
1061                    out_cols.push(base_arc);
1062                }
1063            }
1064            Ok((cs.row_start, cs.row_len, out_cols))
1065        })
1066    }
1067
1068    /// Typed type-tag slices per row-segment.
1069    pub fn type_tags_slices(
1070        &self,
1071    ) -> impl Iterator<Item = Result<(usize, usize, Vec<Arc<arrow_array::UInt8Array>>), ExcelError>> + '_
1072    {
1073        use crate::compute_prelude::zip_select;
1074        use arrow_array::builder::{BooleanBuilder, UInt8Builder};
1075
1076        self.iter_row_chunks().map(move |res| {
1077            let cs = res?;
1078            let mut out_cols: Vec<Arc<arrow_array::UInt8Array>> = Vec::with_capacity(cs.cols.len());
1079            let sheet = self.sheet();
1080            let chunk_starts = &sheet.chunk_starts;
1081
1082            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1083                let base = cs.cols[local_c].type_tag.clone();
1084                let base_ta = base
1085                    .as_any()
1086                    .downcast_ref::<arrow_array::UInt8Array>()
1087                    .unwrap()
1088                    .clone();
1089                let base_arc = Arc::new(base_ta);
1090
1091                let abs_seg_start = self.sr + cs.row_start;
1092                let ch_idx = match chunk_starts.binary_search(&abs_seg_start) {
1093                    Ok(i) => i,
1094                    Err(0) => 0,
1095                    Err(i) => i - 1,
1096                };
1097                if col_idx >= sheet.columns.len() {
1098                    out_cols.push(base_arc);
1099                    continue;
1100                }
1101                let col = &sheet.columns[col_idx];
1102                let Some(ch) = col.chunk(ch_idx) else {
1103                    out_cols.push(base_arc);
1104                    continue;
1105                };
1106                let rel_off = (self.sr + cs.row_start) - chunk_starts[ch_idx];
1107                let seg_range = rel_off..(rel_off + cs.row_len);
1108                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1109                    || (!ch.computed_overlay.is_empty()
1110                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1111                if has_overlay {
1112                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1113                    let mut tb = UInt8Builder::with_capacity(cs.row_len);
1114                    for i in 0..cs.row_len {
1115                        if let Some(ov) = ch
1116                            .overlay
1117                            .get(rel_off + i)
1118                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1119                        {
1120                            mask_b.append_value(true);
1121                            let tag = match ov {
1122                                arrow_store::OverlayValue::Empty => arrow_store::TypeTag::Empty,
1123                                arrow_store::OverlayValue::Number(_) => {
1124                                    arrow_store::TypeTag::Number
1125                                }
1126                                arrow_store::OverlayValue::Boolean(_) => {
1127                                    arrow_store::TypeTag::Boolean
1128                                }
1129                                arrow_store::OverlayValue::Text(_) => arrow_store::TypeTag::Text,
1130                                arrow_store::OverlayValue::Error(_) => arrow_store::TypeTag::Error,
1131                                arrow_store::OverlayValue::Pending => arrow_store::TypeTag::Pending,
1132                            };
1133                            tb.append_value(tag as u8);
1134                        } else {
1135                            mask_b.append_value(false);
1136                            tb.append_null();
1137                        }
1138                    }
1139                    let mask = mask_b.finish();
1140                    let overlay_vals = tb.finish();
1141                    let base_ta = base
1142                        .as_any()
1143                        .downcast_ref::<arrow_array::UInt8Array>()
1144                        .unwrap();
1145                    let zipped =
1146                        zip_select(&mask, &overlay_vals, base_ta).expect("zip tag overlay");
1147                    let ta = zipped
1148                        .as_any()
1149                        .downcast_ref::<arrow_array::UInt8Array>()
1150                        .unwrap()
1151                        .clone();
1152                    out_cols.push(Arc::new(ta));
1153                } else {
1154                    out_cols.push(base_arc);
1155                }
1156            }
1157            Ok((cs.row_start, cs.row_len, out_cols))
1158        })
1159    }
1160
1161    /// Build per-column concatenated lowered text arrays for this view.
1162    /// Uses per-chunk lowered cache for base text and merges overlays via zip_select.
1163    pub fn lowered_text_columns(&self) -> Vec<arrow_array::ArrayRef> {
1164        use crate::compute_prelude::{concat_arrays, zip_select};
1165        use arrow_array::builder::{BooleanBuilder, StringBuilder};
1166
1167        let mut out: Vec<arrow_array::ArrayRef> = Vec::with_capacity(self.cols);
1168        if self.rows == 0 || self.cols == 0 {
1169            return out;
1170        }
1171        let sheet = self.sheet();
1172        let chunk_starts = &sheet.chunk_starts;
1173        // Clamp to physically materialized sheet rows; this view may be logically larger (e.g. A:A).
1174        let sheet_rows = sheet.nrows as usize;
1175        if sheet_rows == 0 || self.sr >= sheet_rows {
1176            for _ in 0..self.cols {
1177                out.push(arrow_array::new_null_array(&DataType::Utf8, 0));
1178            }
1179            return out;
1180        }
1181        let row_end = self.er.min(sheet_rows.saturating_sub(1));
1182        let physical_len = row_end.saturating_sub(self.sr) + 1;
1183        for col_idx in self.sc..=self.ec {
1184            let mut segs: Vec<arrow_array::ArrayRef> = Vec::new();
1185            if col_idx >= sheet.columns.len() {
1186                // OOB: nulls across rows
1187                segs.push(arrow_array::new_null_array(&DataType::Utf8, physical_len));
1188            } else {
1189                let col_ref = &sheet.columns[col_idx];
1190                for (ci, &start) in chunk_starts.iter().enumerate() {
1191                    let chunk_end = chunk_starts
1192                        .get(ci + 1)
1193                        .copied()
1194                        .unwrap_or(sheet.nrows as usize);
1195                    let len = chunk_end.saturating_sub(start);
1196                    if len == 0 {
1197                        continue;
1198                    }
1199                    let end = start + len - 1;
1200                    let is = start.max(self.sr);
1201                    let ie = end.min(row_end);
1202                    if is > ie {
1203                        continue;
1204                    }
1205                    let seg_len = ie - is + 1;
1206                    let rel_off = is - start;
1207                    if let Some(ch) = col_ref.chunk(ci) {
1208                        // Overlay-aware lowered segment
1209                        let has_overlay = ch.overlay.any_in_range(rel_off..(rel_off + seg_len))
1210                            || (!ch.computed_overlay.is_empty()
1211                                && ch
1212                                    .computed_overlay
1213                                    .any_in_range(rel_off..(rel_off + seg_len)));
1214                        if has_overlay {
1215                            // Build lowered overlay values builder
1216                            let mut sb = StringBuilder::with_capacity(seg_len, seg_len * 8);
1217                            // mask overlaid rows
1218                            let mut mb = BooleanBuilder::with_capacity(seg_len);
1219                            for i in 0..seg_len {
1220                                if let Some(ov) = ch
1221                                    .overlay
1222                                    .get(rel_off + i)
1223                                    .or_else(|| ch.computed_overlay.get(rel_off + i))
1224                                {
1225                                    mb.append_value(true);
1226                                    match ov {
1227                                        arrow_store::OverlayValue::Text(s) => {
1228                                            sb.append_value(s.to_ascii_lowercase());
1229                                        }
1230                                        arrow_store::OverlayValue::Empty => {
1231                                            sb.append_null();
1232                                        }
1233                                        arrow_store::OverlayValue::Number(n) => {
1234                                            sb.append_value(n.to_string());
1235                                        }
1236                                        arrow_store::OverlayValue::Boolean(b) => {
1237                                            sb.append_value(if *b { "true" } else { "false" });
1238                                        }
1239                                        arrow_store::OverlayValue::Error(_)
1240                                        | arrow_store::OverlayValue::Pending => {
1241                                            sb.append_null();
1242                                        }
1243                                    }
1244                                } else {
1245                                    // not overlaid
1246                                    sb.append_null();
1247                                    mb.append_value(false);
1248                                }
1249                            }
1250                            let overlay_vals = sb.finish();
1251                            let mask = mb.finish();
1252                            // base lowered segment
1253                            let base_lowered = ch.text_lower_or_null();
1254                            let base_seg = base_lowered.slice(rel_off, seg_len);
1255                            let base_sa = base_seg
1256                                .as_any()
1257                                .downcast_ref::<arrow_array::StringArray>()
1258                                .expect("lowered slice downcast");
1259                            let zipped = zip_select(&mask, &overlay_vals, base_sa)
1260                                .expect("zip lowered text overlay");
1261                            segs.push(zipped);
1262                        } else {
1263                            // No overlay: slice from lowered base
1264                            let lowered = ch.text_lower_or_null();
1265                            segs.push(lowered.slice(rel_off, seg_len));
1266                        }
1267                    } else {
1268                        segs.push(arrow_array::new_null_array(&DataType::Utf8, seg_len));
1269                    }
1270                }
1271            }
1272            // Ensure concat has at least one segment (can happen on sparse/empty sheets).
1273            if segs.is_empty() {
1274                segs.push(arrow_array::new_null_array(&DataType::Utf8, physical_len));
1275            }
1276            // Concat segments for this column
1277            let anys: Vec<&dyn arrow_array::Array> = segs
1278                .iter()
1279                .map(|a| a.as_ref() as &dyn arrow_array::Array)
1280                .collect();
1281            let conc = concat_arrays(&anys).expect("concat lowered segments");
1282            out.push(conc);
1283        }
1284        out
1285    }
1286
1287    /// Slice typed float arrays for a specific row interval (relative to view).
1288    pub fn slice_numbers(
1289        &self,
1290        rel_start: usize,
1291        len: usize,
1292    ) -> Vec<Option<Arc<arrow_array::Float64Array>>> {
1293        let abs_start = self.sr + rel_start;
1294        let abs_end = abs_start + len;
1295        let sheet = self.sheet();
1296        let chunk_starts = &sheet.chunk_starts;
1297
1298        let mut out_cols = Vec::with_capacity(self.cols);
1299        for col_idx in self.sc..=self.ec {
1300            if col_idx >= sheet.columns.len() {
1301                out_cols.push(None);
1302                continue;
1303            }
1304            let col = &sheet.columns[col_idx];
1305
1306            let start_ch_idx = match chunk_starts.binary_search(&abs_start) {
1307                Ok(i) => i,
1308                Err(0) => 0,
1309                Err(i) => i - 1,
1310            };
1311
1312            let mut segments: Vec<Arc<arrow_array::Float64Array>> = Vec::new();
1313            let mut null_only = true;
1314
1315            let mut curr = abs_start;
1316            let mut remaining = len;
1317            let mut ch_idx = start_ch_idx;
1318
1319            while remaining > 0 && ch_idx < chunk_starts.len() {
1320                let ch_start = chunk_starts[ch_idx];
1321                let ch_end = chunk_starts
1322                    .get(ch_idx + 1)
1323                    .copied()
1324                    .unwrap_or(sheet.nrows as usize);
1325                let ch_len = ch_end.saturating_sub(ch_start);
1326                if ch_len == 0 {
1327                    ch_idx += 1;
1328                    continue;
1329                }
1330
1331                let overlap_start = curr.max(ch_start);
1332                let overlap_end = ch_end.min(abs_end);
1333
1334                if overlap_start < overlap_end {
1335                    let seg_len = overlap_end - overlap_start;
1336                    let rel_off_in_chunk = overlap_start - ch_start;
1337
1338                    if let Some(ch) = col.chunk(ch_idx) {
1339                        let base_nums_arc = ch.numbers_or_null();
1340                        let base_nums = base_nums_arc.as_ref();
1341
1342                        let seg_range = rel_off_in_chunk..(rel_off_in_chunk + seg_len);
1343                        let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1344                            || (!ch.computed_overlay.is_empty()
1345                                && ch.computed_overlay.any_in_range(seg_range.clone()));
1346
1347                        let final_arr = if has_overlay {
1348                            let mut nb =
1349                                arrow_array::builder::Float64Builder::with_capacity(seg_len);
1350                            let mut mask_b =
1351                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1352                            for i in 0..seg_len {
1353                                if let Some(ov) = ch
1354                                    .overlay
1355                                    .get(rel_off_in_chunk + i)
1356                                    .or_else(|| ch.computed_overlay.get(rel_off_in_chunk + i))
1357                                {
1358                                    mask_b.append_value(true);
1359                                    match ov {
1360                                        arrow_store::OverlayValue::Number(n) => nb.append_value(*n),
1361                                        _ => nb.append_null(),
1362                                    }
1363                                } else {
1364                                    mask_b.append_value(false);
1365                                    nb.append_null();
1366                                }
1367                            }
1368                            let mask = mask_b.finish();
1369                            let overlay_vals = nb.finish();
1370                            let base_slice = base_nums.slice(rel_off_in_chunk, seg_len);
1371                            let base_fa = base_slice
1372                                .as_any()
1373                                .downcast_ref::<arrow_array::Float64Array>()
1374                                .unwrap();
1375                            let zipped =
1376                                crate::compute_prelude::zip_select(&mask, &overlay_vals, base_fa)
1377                                    .expect("zip slice");
1378                            zipped
1379                                .as_any()
1380                                .downcast_ref::<arrow_array::Float64Array>()
1381                                .unwrap()
1382                                .clone()
1383                        } else {
1384                            let sl = base_nums.slice(rel_off_in_chunk, seg_len);
1385                            sl.as_any()
1386                                .downcast_ref::<arrow_array::Float64Array>()
1387                                .unwrap()
1388                                .clone()
1389                        };
1390
1391                        if final_arr.null_count() < final_arr.len() {
1392                            null_only = false;
1393                        }
1394                        segments.push(Arc::new(final_arr));
1395                    } else {
1396                        segments.push(Arc::new(arrow_array::Float64Array::new_null(seg_len)));
1397                    }
1398                    curr += seg_len;
1399                    remaining -= seg_len;
1400                }
1401                ch_idx += 1;
1402            }
1403
1404            if remaining > 0 {
1405                segments.push(Arc::new(arrow_array::Float64Array::new_null(remaining)));
1406            }
1407
1408            if segments.len() == 1 {
1409                if null_only && segments[0].null_count() == segments[0].len() {
1410                    out_cols.push(None);
1411                } else {
1412                    out_cols.push(Some(segments.pop().unwrap()));
1413                }
1414            } else {
1415                let refs: Vec<&dyn Array> =
1416                    segments.iter().map(|a| a.as_ref() as &dyn Array).collect();
1417                let c = crate::compute_prelude::concat_arrays(&refs).expect("concat slice");
1418                let fa = c
1419                    .as_any()
1420                    .downcast_ref::<arrow_array::Float64Array>()
1421                    .unwrap()
1422                    .clone();
1423                out_cols.push(Some(Arc::new(fa)));
1424            }
1425        }
1426        out_cols
1427    }
1428
1429    /// Slice typed lowered text arrays for a specific row interval (relative to view).
1430    pub fn slice_lowered_text(
1431        &self,
1432        rel_start: usize,
1433        len: usize,
1434    ) -> Vec<Option<Arc<arrow_array::StringArray>>> {
1435        let abs_start = self.sr + rel_start;
1436        let abs_end = abs_start + len;
1437        let sheet = self.sheet();
1438        let chunk_starts = &sheet.chunk_starts;
1439
1440        let mut out_cols = Vec::with_capacity(self.cols);
1441        for col_idx in self.sc..=self.ec {
1442            if col_idx >= sheet.columns.len() {
1443                out_cols.push(None);
1444                continue;
1445            }
1446            let col = &sheet.columns[col_idx];
1447            let start_ch_idx = match chunk_starts.binary_search(&abs_start) {
1448                Ok(i) => i,
1449                Err(0) => 0,
1450                Err(i) => i - 1,
1451            };
1452
1453            let mut segments: Vec<Arc<arrow_array::StringArray>> = Vec::new();
1454            let mut null_only = true;
1455
1456            let mut curr = abs_start;
1457            let mut remaining = len;
1458            let mut ch_idx = start_ch_idx;
1459
1460            while remaining > 0 && ch_idx < chunk_starts.len() {
1461                let ch_start = chunk_starts[ch_idx];
1462                let ch_end = chunk_starts
1463                    .get(ch_idx + 1)
1464                    .copied()
1465                    .unwrap_or(sheet.nrows as usize);
1466                let ch_len = ch_end.saturating_sub(ch_start);
1467                if ch_len == 0 {
1468                    ch_idx += 1;
1469                    continue;
1470                }
1471
1472                let overlap_start = curr.max(ch_start);
1473                let overlap_end = ch_end.min(abs_end);
1474
1475                if overlap_start < overlap_end {
1476                    let seg_len = overlap_end - overlap_start;
1477                    let rel_off_in_chunk = overlap_start - ch_start;
1478
1479                    if let Some(ch) = col.chunk(ch_idx) {
1480                        let base_lowered = ch.text_lower_or_null();
1481                        let seg_range = rel_off_in_chunk..(rel_off_in_chunk + seg_len);
1482                        let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1483                            || (!ch.computed_overlay.is_empty()
1484                                && ch.computed_overlay.any_in_range(seg_range.clone()));
1485
1486                        let final_arr = if has_overlay {
1487                            let mut sb = arrow_array::builder::StringBuilder::with_capacity(
1488                                seg_len,
1489                                seg_len * 8,
1490                            );
1491                            let mut mask_b =
1492                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1493                            for i in 0..seg_len {
1494                                if let Some(ov) = ch
1495                                    .overlay
1496                                    .get(rel_off_in_chunk + i)
1497                                    .or_else(|| ch.computed_overlay.get(rel_off_in_chunk + i))
1498                                {
1499                                    mask_b.append_value(true);
1500                                    match ov {
1501                                        arrow_store::OverlayValue::Text(s) => {
1502                                            sb.append_value(s.to_ascii_lowercase())
1503                                        }
1504                                        arrow_store::OverlayValue::Number(n) => {
1505                                            sb.append_value(n.to_string())
1506                                        }
1507                                        arrow_store::OverlayValue::Boolean(b) => {
1508                                            sb.append_value(if *b { "true" } else { "false" })
1509                                        }
1510                                        _ => sb.append_null(),
1511                                    }
1512                                } else {
1513                                    mask_b.append_value(false);
1514                                    sb.append_null();
1515                                }
1516                            }
1517                            let mask = mask_b.finish();
1518                            let overlay_vals = sb.finish();
1519                            let base_slice = base_lowered.slice(rel_off_in_chunk, seg_len);
1520                            let base_sa = base_slice
1521                                .as_any()
1522                                .downcast_ref::<arrow_array::StringArray>()
1523                                .unwrap();
1524                            let zipped =
1525                                crate::compute_prelude::zip_select(&mask, &overlay_vals, base_sa)
1526                                    .expect("zip text");
1527                            zipped
1528                                .as_any()
1529                                .downcast_ref::<arrow_array::StringArray>()
1530                                .unwrap()
1531                                .clone()
1532                        } else {
1533                            let sl = base_lowered.slice(rel_off_in_chunk, seg_len);
1534                            sl.as_any()
1535                                .downcast_ref::<arrow_array::StringArray>()
1536                                .unwrap()
1537                                .clone()
1538                        };
1539
1540                        if final_arr.null_count() < final_arr.len() {
1541                            null_only = false;
1542                        }
1543                        segments.push(Arc::new(final_arr));
1544                    } else {
1545                        segments.push(Arc::new(arrow_array::StringArray::new_null(seg_len)));
1546                    }
1547                    curr += seg_len;
1548                    remaining -= seg_len;
1549                }
1550                ch_idx += 1;
1551            }
1552
1553            if remaining > 0 {
1554                segments.push(Arc::new(arrow_array::StringArray::new_null(remaining)));
1555            }
1556
1557            if segments.len() == 1 {
1558                if null_only && segments[0].null_count() == segments[0].len() {
1559                    out_cols.push(None);
1560                } else {
1561                    out_cols.push(Some(segments.pop().unwrap()));
1562                }
1563            } else {
1564                let refs: Vec<&dyn Array> =
1565                    segments.iter().map(|a| a.as_ref() as &dyn Array).collect();
1566                let c = crate::compute_prelude::concat_arrays(&refs).expect("concat text");
1567                let sa = c
1568                    .as_any()
1569                    .downcast_ref::<arrow_array::StringArray>()
1570                    .unwrap()
1571                    .clone();
1572                out_cols.push(Some(Arc::new(sa)));
1573            }
1574        }
1575        out_cols
1576    }
1577}
1578
1579#[inline]
1580fn pack_numeric(v: &LiteralValue, policy: CoercionPolicy) -> Result<Option<f64>, ExcelError> {
1581    match policy {
1582        CoercionPolicy::NumberLenientText => match v {
1583            LiteralValue::Error(e) => Err(e.clone()),
1584            LiteralValue::Empty => Ok(None),
1585            other => Ok(crate::coercion::to_number_lenient(other).ok()),
1586        },
1587        CoercionPolicy::NumberStrict => match v {
1588            LiteralValue::Error(e) => Err(e.clone()),
1589            LiteralValue::Empty => Ok(None),
1590            other => Ok(crate::coercion::to_number_strict(other).ok()),
1591        },
1592        _ => match v {
1593            LiteralValue::Error(e) => Err(e.clone()),
1594            _ => Ok(None),
1595        },
1596    }
1597}
1598
1599#[cfg(test)]
1600mod tests {
1601    use super::*;
1602
1603    #[test]
1604    fn owned_rows_numeric_chunking() {
1605        let data: Vec<Vec<LiteralValue>> = vec![
1606            vec![
1607                LiteralValue::Number(1.0),
1608                LiteralValue::Text("x".into()),
1609                LiteralValue::Number(3.0),
1610            ],
1611            vec![
1612                LiteralValue::Boolean(true),
1613                LiteralValue::Empty,
1614                LiteralValue::Number(2.5),
1615            ],
1616        ];
1617        let view = RangeView::from_owned_rows(data, DateSystem::Excel1900);
1618        let mut sum = 0.0f64;
1619        view.numbers_chunked(CoercionPolicy::NumberLenientText, 2, &mut |chunk| {
1620            for &n in chunk.data {
1621                sum += n;
1622            }
1623            Ok(())
1624        })
1625        .unwrap();
1626        assert!((sum - 7.5).abs() < 1e-9);
1627    }
1628
1629    #[test]
1630    fn as_1x1_works() {
1631        let view = RangeView::from_owned_rows(
1632            vec![vec![LiteralValue::Number(7.0)]],
1633            DateSystem::Excel1900,
1634        );
1635        assert_eq!(view.as_1x1(), Some(LiteralValue::Number(7.0)));
1636    }
1637}