formualizer_eval/arrow_store/
mod.rs

1use crate::compute_prelude::{concat_arrays, zip_select};
2use arrow_array::Array;
3use arrow_array::new_null_array;
4use arrow_schema::DataType;
5use chrono::Timelike;
6use std::sync::Arc;
7
8use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
9use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
10use once_cell::sync::OnceCell;
11
12use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
13use std::collections::HashMap;
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane (ASCII lower), nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91}
92
93impl ColumnChunk {
94    #[inline]
95    pub fn len(&self) -> usize {
96        self.type_tag.len()
97    }
98    #[inline]
99    pub fn is_empty(&self) -> bool {
100        self.len() == 0
101    }
102    #[inline]
103    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
104        if let Some(a) = &self.numbers {
105            return a.clone();
106        }
107        self.lazy_null_numbers
108            .get_or_init(|| {
109                let arr = new_null_array(&DataType::Float64, self.len());
110                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
111            })
112            .clone()
113    }
114    #[inline]
115    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
116        if let Some(a) = &self.booleans {
117            return a.clone();
118        }
119        self.lazy_null_booleans
120            .get_or_init(|| {
121                let arr = new_null_array(&DataType::Boolean, self.len());
122                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
123            })
124            .clone()
125    }
126    #[inline]
127    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
128        if let Some(a) = &self.errors {
129            return a.clone();
130        }
131        self.lazy_null_errors
132            .get_or_init(|| {
133                let arr = new_null_array(&DataType::UInt8, self.len());
134                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
135            })
136            .clone()
137    }
138    #[inline]
139    pub fn text_or_null(&self) -> ArrayRef {
140        if let Some(a) = &self.text {
141            return a.clone();
142        }
143        self.lazy_null_text
144            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
145            .clone()
146    }
147
148    /// Lowercased text lane (ASCII lower), with nulls preserved. Cached per chunk.
149    pub fn text_lower_or_null(&self) -> ArrayRef {
150        if let Some(a) = self.lowered_text.get() {
151            return a.clone();
152        }
153        // Lowercase when text present; else return null Utf8
154        let out: ArrayRef = if let Some(txt) = &self.text {
155            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
156            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
157            for i in 0..sa.len() {
158                if sa.is_null(i) {
159                    b.append_null();
160                } else {
161                    b.append_value(sa.value(i).to_ascii_lowercase());
162                }
163            }
164            let lowered = b.finish();
165            Arc::new(lowered)
166        } else {
167            new_null_array(&DataType::Utf8, self.len())
168        };
169        self.lowered_text.get_or_init(|| out.clone());
170        out
171    }
172}
173
174#[derive(Debug, Clone)]
175pub struct ArrowColumn {
176    pub chunks: Vec<ColumnChunk>,
177    pub index: u32,
178}
179
180#[derive(Debug, Clone)]
181pub struct ArrowSheet {
182    pub name: Arc<str>,
183    pub columns: Vec<ArrowColumn>,
184    pub nrows: u32,
185    pub chunk_starts: Vec<usize>,
186}
187
188#[derive(Debug, Default, Clone)]
189pub struct SheetStore {
190    pub sheets: Vec<ArrowSheet>,
191}
192
193impl SheetStore {
194    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
195        self.sheets.iter().find(|s| s.name.as_ref() == name)
196    }
197    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
198        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
199    }
200}
201
202/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
203pub struct IngestBuilder {
204    name: Arc<str>,
205    ncols: usize,
206    chunk_rows: usize,
207    date_system: crate::engine::DateSystem,
208
209    // Per-column active builders for current chunk
210    num_builders: Vec<Float64Builder>,
211    bool_builders: Vec<BooleanBuilder>,
212    text_builders: Vec<StringBuilder>,
213    err_builders: Vec<UInt8Builder>,
214    tag_builders: Vec<UInt8Builder>,
215
216    // Per-column per-lane non-null counters for current chunk
217    lane_counts: Vec<LaneCounts>,
218
219    // Accumulated chunks
220    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
221    row_in_chunk: usize,
222    total_rows: u32,
223}
224
225#[derive(Debug, Clone, Copy, Default)]
226struct LaneCounts {
227    n_num: usize,
228    n_bool: usize,
229    n_text: usize,
230    n_err: usize,
231}
232
233impl IngestBuilder {
234    pub fn new(
235        sheet_name: &str,
236        ncols: usize,
237        chunk_rows: usize,
238        date_system: crate::engine::DateSystem,
239    ) -> Self {
240        let mut chunks = Vec::with_capacity(ncols);
241        chunks.resize_with(ncols, Vec::new);
242        Self {
243            name: Arc::from(sheet_name.to_string()),
244            ncols,
245            chunk_rows: chunk_rows.max(1),
246            date_system,
247            num_builders: (0..ncols)
248                .map(|_| Float64Builder::with_capacity(chunk_rows))
249                .collect(),
250            bool_builders: (0..ncols)
251                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
252                .collect(),
253            text_builders: (0..ncols)
254                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
255                .collect(),
256            err_builders: (0..ncols)
257                .map(|_| UInt8Builder::with_capacity(chunk_rows))
258                .collect(),
259            tag_builders: (0..ncols)
260                .map(|_| UInt8Builder::with_capacity(chunk_rows))
261                .collect(),
262            lane_counts: vec![LaneCounts::default(); ncols],
263            chunks,
264            row_in_chunk: 0,
265            total_rows: 0,
266        }
267    }
268
269    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
270    /// Text borrows are copied into the internal StringBuilder.
271    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
272        assert_eq!(row.len(), self.ncols, "row width mismatch");
273        for (c, cell) in row.iter().enumerate() {
274            match cell {
275                CellIngest::Empty => {
276                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
277                    self.num_builders[c].append_null();
278                    self.bool_builders[c].append_null();
279                    self.text_builders[c].append_null();
280                    self.err_builders[c].append_null();
281                }
282                CellIngest::Number(n) => {
283                    self.tag_builders[c].append_value(TypeTag::Number as u8);
284                    self.num_builders[c].append_value(*n);
285                    self.lane_counts[c].n_num += 1;
286                    self.bool_builders[c].append_null();
287                    self.text_builders[c].append_null();
288                    self.err_builders[c].append_null();
289                }
290                CellIngest::Boolean(b) => {
291                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
292                    self.num_builders[c].append_null();
293                    self.bool_builders[c].append_value(*b);
294                    self.lane_counts[c].n_bool += 1;
295                    self.text_builders[c].append_null();
296                    self.err_builders[c].append_null();
297                }
298                CellIngest::Text(s) => {
299                    self.tag_builders[c].append_value(TypeTag::Text as u8);
300                    self.num_builders[c].append_null();
301                    self.bool_builders[c].append_null();
302                    self.text_builders[c].append_value(s);
303                    self.lane_counts[c].n_text += 1;
304                    self.err_builders[c].append_null();
305                }
306                CellIngest::ErrorCode(code) => {
307                    self.tag_builders[c].append_value(TypeTag::Error as u8);
308                    self.num_builders[c].append_null();
309                    self.bool_builders[c].append_null();
310                    self.text_builders[c].append_null();
311                    self.err_builders[c].append_value(*code);
312                    self.lane_counts[c].n_err += 1;
313                }
314                CellIngest::DateSerial(serial) => {
315                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
316                    self.num_builders[c].append_value(*serial);
317                    self.lane_counts[c].n_num += 1;
318                    self.bool_builders[c].append_null();
319                    self.text_builders[c].append_null();
320                    self.err_builders[c].append_null();
321                }
322                CellIngest::Pending => {
323                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
324                    self.num_builders[c].append_null();
325                    self.bool_builders[c].append_null();
326                    self.text_builders[c].append_null();
327                    self.err_builders[c].append_null();
328                }
329            }
330        }
331        self.row_in_chunk += 1;
332        self.total_rows += 1;
333        if self.row_in_chunk >= self.chunk_rows {
334            self.finish_chunk();
335        }
336        Ok(())
337    }
338
339    /// Streaming row append from an iterator of typed cell tokens.
340    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
341    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
342    where
343        I: ExactSizeIterator<Item = CellIngest<'a>>,
344    {
345        assert_eq!(iter.len(), self.ncols, "row width mismatch");
346        for (c, cell) in iter.enumerate() {
347            match cell {
348                CellIngest::Empty => {
349                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
350                    self.num_builders[c].append_null();
351                    self.bool_builders[c].append_null();
352                    self.text_builders[c].append_null();
353                    self.err_builders[c].append_null();
354                }
355                CellIngest::Number(n) => {
356                    self.tag_builders[c].append_value(TypeTag::Number as u8);
357                    self.num_builders[c].append_value(n);
358                    self.lane_counts[c].n_num += 1;
359                    self.bool_builders[c].append_null();
360                    self.text_builders[c].append_null();
361                    self.err_builders[c].append_null();
362                }
363                CellIngest::Boolean(b) => {
364                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
365                    self.num_builders[c].append_null();
366                    self.bool_builders[c].append_value(b);
367                    self.lane_counts[c].n_bool += 1;
368                    self.text_builders[c].append_null();
369                    self.err_builders[c].append_null();
370                }
371                CellIngest::Text(s) => {
372                    self.tag_builders[c].append_value(TypeTag::Text as u8);
373                    self.num_builders[c].append_null();
374                    self.bool_builders[c].append_null();
375                    self.text_builders[c].append_value(s);
376                    self.lane_counts[c].n_text += 1;
377                    self.err_builders[c].append_null();
378                }
379                CellIngest::ErrorCode(code) => {
380                    self.tag_builders[c].append_value(TypeTag::Error as u8);
381                    self.num_builders[c].append_null();
382                    self.bool_builders[c].append_null();
383                    self.text_builders[c].append_null();
384                    self.err_builders[c].append_value(code);
385                    self.lane_counts[c].n_err += 1;
386                }
387                CellIngest::DateSerial(serial) => {
388                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
389                    self.num_builders[c].append_value(serial);
390                    self.lane_counts[c].n_num += 1;
391                    self.bool_builders[c].append_null();
392                    self.text_builders[c].append_null();
393                    self.err_builders[c].append_null();
394                }
395                CellIngest::Pending => {
396                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
397                    self.num_builders[c].append_null();
398                    self.bool_builders[c].append_null();
399                    self.text_builders[c].append_null();
400                    self.err_builders[c].append_null();
401                }
402            }
403        }
404        self.row_in_chunk += 1;
405        self.total_rows += 1;
406        if self.row_in_chunk >= self.chunk_rows {
407            self.finish_chunk();
408        }
409        Ok(())
410    }
411
412    /// Append a single row of values. Length must match `ncols`.
413    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
414        assert_eq!(row.len(), self.ncols, "row width mismatch");
415
416        for (c, v) in row.iter().enumerate() {
417            let tag = TypeTag::from_value(v) as u8;
418            self.tag_builders[c].append_value(tag);
419
420            match v {
421                LiteralValue::Empty => {
422                    self.num_builders[c].append_null();
423                    self.bool_builders[c].append_null();
424                    self.text_builders[c].append_null();
425                    self.err_builders[c].append_null();
426                }
427                LiteralValue::Int(i) => {
428                    self.num_builders[c].append_value(*i as f64);
429                    self.lane_counts[c].n_num += 1;
430                    self.bool_builders[c].append_null();
431                    self.text_builders[c].append_null();
432                    self.err_builders[c].append_null();
433                }
434                LiteralValue::Number(n) => {
435                    self.num_builders[c].append_value(*n);
436                    self.lane_counts[c].n_num += 1;
437                    self.bool_builders[c].append_null();
438                    self.text_builders[c].append_null();
439                    self.err_builders[c].append_null();
440                }
441                LiteralValue::Boolean(b) => {
442                    self.num_builders[c].append_null();
443                    self.bool_builders[c].append_value(*b);
444                    self.lane_counts[c].n_bool += 1;
445                    self.text_builders[c].append_null();
446                    self.err_builders[c].append_null();
447                }
448                LiteralValue::Text(s) => {
449                    self.num_builders[c].append_null();
450                    self.bool_builders[c].append_null();
451                    self.text_builders[c].append_value(s);
452                    self.lane_counts[c].n_text += 1;
453                    self.err_builders[c].append_null();
454                }
455                LiteralValue::Error(e) => {
456                    self.num_builders[c].append_null();
457                    self.bool_builders[c].append_null();
458                    self.text_builders[c].append_null();
459                    self.err_builders[c].append_value(map_error_code(e.kind));
460                    self.lane_counts[c].n_err += 1;
461                }
462                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
463                LiteralValue::Date(d) => {
464                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
465                    let serial =
466                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
467                    self.num_builders[c].append_value(serial);
468                    self.lane_counts[c].n_num += 1;
469                    self.bool_builders[c].append_null();
470                    self.text_builders[c].append_null();
471                    self.err_builders[c].append_null();
472                }
473                LiteralValue::DateTime(dt) => {
474                    let serial =
475                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
476                    self.num_builders[c].append_value(serial);
477                    self.lane_counts[c].n_num += 1;
478                    self.bool_builders[c].append_null();
479                    self.text_builders[c].append_null();
480                    self.err_builders[c].append_null();
481                }
482                LiteralValue::Time(t) => {
483                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
484                    self.num_builders[c].append_value(serial);
485                    self.lane_counts[c].n_num += 1;
486                    self.bool_builders[c].append_null();
487                    self.text_builders[c].append_null();
488                    self.err_builders[c].append_null();
489                }
490                LiteralValue::Duration(dur) => {
491                    let serial = dur.num_seconds() as f64 / 86_400.0;
492                    self.num_builders[c].append_value(serial);
493                    self.lane_counts[c].n_num += 1;
494                    self.bool_builders[c].append_null();
495                    self.text_builders[c].append_null();
496                    self.err_builders[c].append_null();
497                }
498                LiteralValue::Array(_) => {
499                    // Not allowed as a stored scalar; mark as error kind VALUE
500                    self.num_builders[c].append_null();
501                    self.bool_builders[c].append_null();
502                    self.text_builders[c].append_null();
503                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
504                    self.lane_counts[c].n_err += 1;
505                }
506                LiteralValue::Pending => {
507                    // Pending: tag only; all lanes remain null (no error)
508                    self.num_builders[c].append_null();
509                    self.bool_builders[c].append_null();
510                    self.text_builders[c].append_null();
511                    self.err_builders[c].append_null();
512                }
513            }
514        }
515
516        self.row_in_chunk += 1;
517        self.total_rows += 1;
518
519        if self.row_in_chunk >= self.chunk_rows {
520            self.finish_chunk();
521        }
522
523        Ok(())
524    }
525
526    fn finish_chunk(&mut self) {
527        if self.row_in_chunk == 0 {
528            return;
529        }
530        for c in 0..self.ncols {
531            let len = self.row_in_chunk;
532            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
533                None
534            } else {
535                Some(Arc::new(self.num_builders[c].finish()))
536            };
537            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
538                None
539            } else {
540                Some(Arc::new(self.bool_builders[c].finish()))
541            };
542            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
543                None
544            } else {
545                Some(Arc::new(self.text_builders[c].finish()))
546            };
547            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
548                None
549            } else {
550                Some(Arc::new(self.err_builders[c].finish()))
551            };
552            let tags: UInt8Array = self.tag_builders[c].finish();
553
554            let chunk = ColumnChunk {
555                numbers: numbers_arc,
556                booleans: booleans_arc,
557                text: text_ref,
558                errors: errors_arc,
559                type_tag: Arc::new(tags),
560                formula_id: None,
561                meta: ColumnChunkMeta {
562                    len,
563                    non_null_num: self.lane_counts[c].n_num,
564                    non_null_bool: self.lane_counts[c].n_bool,
565                    non_null_text: self.lane_counts[c].n_text,
566                    non_null_err: self.lane_counts[c].n_err,
567                },
568                lazy_null_numbers: OnceCell::new(),
569                lazy_null_booleans: OnceCell::new(),
570                lazy_null_text: OnceCell::new(),
571                lazy_null_errors: OnceCell::new(),
572                lowered_text: OnceCell::new(),
573                overlay: Overlay::new(),
574            };
575            self.chunks[c].push(chunk);
576
577            // re-init builders for next chunk
578            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
579            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
580            self.text_builders[c] =
581                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
582            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
583            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
584            self.lane_counts[c] = LaneCounts::default();
585        }
586        self.row_in_chunk = 0;
587    }
588
589    pub fn finish(mut self) -> ArrowSheet {
590        // flush partial chunk
591        if self.row_in_chunk > 0 {
592            self.finish_chunk();
593        }
594
595        let mut columns = Vec::with_capacity(self.ncols);
596        for (idx, chunks) in self.chunks.into_iter().enumerate() {
597            columns.push(ArrowColumn {
598                chunks,
599                index: idx as u32,
600            });
601        }
602        // Precompute chunk starts from first column and enforce alignment across columns
603        let mut chunk_starts: Vec<usize> = Vec::new();
604        if let Some(col0) = columns.first() {
605            let chunks_len0 = col0.chunks.len();
606            for (ci, col) in columns.iter().enumerate() {
607                if col.chunks.len() != chunks_len0 {
608                    panic!(
609                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
610                        ci,
611                        col.chunks.len(),
612                        chunks_len0
613                    );
614                }
615            }
616            let mut cur = 0usize;
617            for i in 0..chunks_len0 {
618                let len_i = col0.chunks[i].type_tag.len();
619                for (ci, col) in columns.iter().enumerate() {
620                    let got = col.chunks[i].type_tag.len();
621                    if got != len_i {
622                        panic!(
623                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
624                        );
625                    }
626                }
627                chunk_starts.push(cur);
628                cur += len_i;
629            }
630        }
631        ArrowSheet {
632            name: self.name,
633            columns,
634            nrows: self.total_rows,
635            chunk_starts,
636        }
637    }
638}
639
640pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
641    match kind {
642        ExcelErrorKind::Null => 1,
643        ExcelErrorKind::Ref => 2,
644        ExcelErrorKind::Name => 3,
645        ExcelErrorKind::Value => 4,
646        ExcelErrorKind::Div => 5,
647        ExcelErrorKind::Na => 6,
648        ExcelErrorKind::Num => 7,
649        ExcelErrorKind::Error => 8,
650        ExcelErrorKind::NImpl => 9,
651        ExcelErrorKind::Spill => 10,
652        ExcelErrorKind::Calc => 11,
653        ExcelErrorKind::Circ => 12,
654        ExcelErrorKind::Cancelled => 13,
655    }
656}
657
658pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
659    match code {
660        1 => ExcelErrorKind::Null,
661        2 => ExcelErrorKind::Ref,
662        3 => ExcelErrorKind::Name,
663        4 => ExcelErrorKind::Value,
664        5 => ExcelErrorKind::Div,
665        6 => ExcelErrorKind::Na,
666        7 => ExcelErrorKind::Num,
667        8 => ExcelErrorKind::Error,
668        9 => ExcelErrorKind::NImpl,
669        10 => ExcelErrorKind::Spill,
670        11 => ExcelErrorKind::Calc,
671        12 => ExcelErrorKind::Circ,
672        13 => ExcelErrorKind::Cancelled,
673        _ => ExcelErrorKind::Error,
674    }
675}
676
677// ─────────────────────────── Overlay (Phase C) ────────────────────────────
678
679/// Zero-allocation cell token for ingestion.
680pub enum CellIngest<'a> {
681    Empty,
682    Number(f64),
683    Boolean(bool),
684    Text(&'a str),
685    ErrorCode(u8),
686    DateSerial(f64),
687    Pending,
688}
689
690#[derive(Debug, Clone)]
691pub enum OverlayValue {
692    Empty,
693    Number(f64),
694    Boolean(bool),
695    Text(Arc<str>),
696    Error(u8),
697    Pending,
698}
699
700#[derive(Debug, Default, Clone)]
701pub struct Overlay {
702    map: HashMap<usize, OverlayValue>,
703}
704
705impl Overlay {
706    pub fn new() -> Self {
707        Self {
708            map: HashMap::new(),
709        }
710    }
711    #[inline]
712    pub fn get(&self, off: usize) -> Option<&OverlayValue> {
713        self.map.get(&off)
714    }
715    #[inline]
716    pub fn set(&mut self, off: usize, v: OverlayValue) {
717        self.map.insert(off, v);
718    }
719    #[inline]
720    pub fn clear(&mut self) {
721        self.map.clear();
722    }
723    #[inline]
724    pub fn len(&self) -> usize {
725        self.map.len()
726    }
727    #[inline]
728    pub fn is_empty(&self) -> bool {
729        self.map.is_empty()
730    }
731    #[inline]
732    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
733        self.map.keys().any(|k| range.contains(k))
734    }
735}
736
737/// A lightweight view over a rectangular range in an `ArrowSheet`.
738/// Coordinates are 0-based and inclusive.
739pub struct ArrowRangeView<'a> {
740    sheet: &'a ArrowSheet,
741    sr: usize,
742    sc: usize,
743    er: usize,
744    ec: usize,
745    rows: usize,
746    cols: usize,
747    chunk_starts: &'a [usize],
748}
749
750impl ArrowSheet {
751    /// Return a summary of each column's chunk counts, total rows, and lane presence.
752    pub fn shape(&self) -> Vec<ColumnShape> {
753        self.columns
754            .iter()
755            .map(|c| {
756                let chunks = c.chunks.len();
757                let rows = self.nrows as usize;
758                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
759                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
760                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
761                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
762                ColumnShape {
763                    index: c.index,
764                    chunks,
765                    rows,
766                    has_num,
767                    has_bool,
768                    has_text,
769                    has_err,
770                }
771            })
772            .collect()
773    }
774    pub fn range_view(&self, sr: usize, sc: usize, er: usize, ec: usize) -> ArrowRangeView<'_> {
775        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
776        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
777        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
778        ArrowRangeView {
779            sheet: self,
780            sr,
781            sc,
782            er,
783            ec,
784            rows,
785            cols,
786            chunk_starts: &self.chunk_starts,
787        }
788    }
789
790    /// Ensure capacity to address at least target_rows rows by appending empty chunks.
791    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
792        if target_rows as u32 <= self.nrows {
793            return;
794        }
795        // Determine chunk size from last chunk, fallback to 32k
796        let mut chunk_size = 32 * 1024;
797        if let Some(c0) = self.columns.first() {
798            if let Some(last) = c0.chunks.last() {
799                chunk_size = last.type_tag.len().max(1);
800            }
801        }
802        let mut cur_rows = self.nrows as usize;
803        while cur_rows < target_rows {
804            let len = (target_rows - cur_rows).min(chunk_size);
805            // Append chunk_starts entry
806            self.chunk_starts.push(cur_rows);
807            for col in &mut self.columns {
808                let tags = UInt8Array::from(vec![TypeTag::Empty as u8; len]);
809                col.chunks.push(ColumnChunk {
810                    numbers: None,
811                    booleans: None,
812                    text: None,
813                    errors: None,
814                    type_tag: Arc::new(tags),
815                    formula_id: None,
816                    meta: ColumnChunkMeta {
817                        len,
818                        non_null_num: 0,
819                        non_null_bool: 0,
820                        non_null_text: 0,
821                        non_null_err: 0,
822                    },
823                    lazy_null_numbers: OnceCell::new(),
824                    lazy_null_booleans: OnceCell::new(),
825                    lazy_null_text: OnceCell::new(),
826                    lazy_null_errors: OnceCell::new(),
827                    lowered_text: OnceCell::new(),
828                    overlay: Overlay::new(),
829                });
830            }
831            cur_rows += len;
832            self.nrows = cur_rows as u32;
833        }
834    }
835
836    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
837    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
838        if abs_row >= self.nrows as usize {
839            return None;
840        }
841        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
842            Ok(i) => i,
843            Err(0) => 0,
844            Err(i) => i - 1,
845        };
846        let start = self.chunk_starts[ch_idx];
847        Some((ch_idx, abs_row - start))
848    }
849
850    fn recompute_chunk_starts(&mut self) {
851        self.chunk_starts.clear();
852        if let Some(col0) = self.columns.first() {
853            let mut cur = 0usize;
854            for ch in &col0.chunks {
855                self.chunk_starts.push(cur);
856                cur += ch.type_tag.len();
857            }
858        }
859    }
860
861    fn make_empty_chunk(len: usize) -> ColumnChunk {
862        ColumnChunk {
863            numbers: None,
864            booleans: None,
865            text: None,
866            errors: None,
867            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
868            formula_id: None,
869            meta: ColumnChunkMeta {
870                len,
871                non_null_num: 0,
872                non_null_bool: 0,
873                non_null_text: 0,
874                non_null_err: 0,
875            },
876            lazy_null_numbers: OnceCell::new(),
877            lazy_null_booleans: OnceCell::new(),
878            lazy_null_text: OnceCell::new(),
879            lazy_null_errors: OnceCell::new(),
880            lowered_text: OnceCell::new(),
881            overlay: Overlay::new(),
882        }
883    }
884
885    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
886        // Slice type tags
887        use arrow_array::Array;
888        let type_tag: Arc<UInt8Array> = Arc::new(
889            Array::slice(ch.type_tag.as_ref(), off, len)
890                .as_any()
891                .downcast_ref::<UInt8Array>()
892                .unwrap()
893                .clone(),
894        );
895        // Slice numbers if present and keep only if any non-null
896        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
897            let sl = Array::slice(a.as_ref(), off, len);
898            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
899            let nn = len.saturating_sub(fa.null_count());
900            if nn == 0 { None } else { Some(Arc::new(fa)) }
901        });
902        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
903            let sl = Array::slice(a.as_ref(), off, len);
904            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
905            let nn = len.saturating_sub(ba.null_count());
906            if nn == 0 { None } else { Some(Arc::new(ba)) }
907        });
908        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
909            let sl = Array::slice(a.as_ref(), off, len);
910            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
911            let nn = len.saturating_sub(sa.null_count());
912            if nn == 0 {
913                None
914            } else {
915                Some(Arc::new(sa) as ArrayRef)
916            }
917        });
918        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
919            let sl = Array::slice(a.as_ref(), off, len);
920            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
921            let nn = len.saturating_sub(ea.null_count());
922            if nn == 0 { None } else { Some(Arc::new(ea)) }
923        });
924        // Split overlays for this slice
925        let mut overlay = Overlay::new();
926        for (k, v) in ch.overlay.map.iter() {
927            if *k >= off && *k < off + len {
928                overlay.set(*k - off, v.clone());
929            }
930        }
931        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
932        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
933        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
934        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
935        ColumnChunk {
936            numbers: numbers.clone(),
937            booleans: booleans.clone(),
938            text: text.clone(),
939            errors: errors.clone(),
940            type_tag,
941            formula_id: None,
942            meta: ColumnChunkMeta {
943                len,
944                non_null_num,
945                non_null_bool,
946                non_null_text,
947                non_null_err,
948            },
949            lazy_null_numbers: OnceCell::new(),
950            lazy_null_booleans: OnceCell::new(),
951            lazy_null_text: OnceCell::new(),
952            lazy_null_errors: OnceCell::new(),
953            lowered_text: OnceCell::new(),
954            overlay,
955        }
956    }
957
958    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
959    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
960    pub fn maybe_compact_chunk(
961        &mut self,
962        col_idx: usize,
963        ch_idx: usize,
964        abs_threshold: usize,
965        frac_den: usize,
966    ) -> bool {
967        if col_idx >= self.columns.len() || self.columns[col_idx].chunks.len() <= ch_idx {
968            return false;
969        }
970        let ch = &self.columns[col_idx].chunks[ch_idx];
971        let len = ch.type_tag.len();
972        if len == 0 {
973            return false;
974        }
975        let ov_len = ch.overlay.len();
976        let den = if frac_den.max(1) == 0 {
977            1
978        } else {
979            frac_den.max(1)
980        };
981        let trig = ov_len > (len / den) || ov_len > abs_threshold;
982        if !trig {
983            return false;
984        }
985        // Rebuild: merge base lanes with overlays row-by-row
986        let mut tag_b = UInt8Builder::with_capacity(len);
987        let mut nb = Float64Builder::with_capacity(len);
988        let mut bb = BooleanBuilder::with_capacity(len);
989        let mut sb = StringBuilder::with_capacity(len, len * 8);
990        let mut eb = UInt8Builder::with_capacity(len);
991        let mut non_num = 0usize;
992        let mut non_bool = 0usize;
993        let mut non_text = 0usize;
994        let mut non_err = 0usize;
995
996        let ch_ref = &self.columns[col_idx].chunks[ch_idx];
997        for i in 0..len {
998            // If overlay present, use it. Otherwise, use base tag+lane
999            if let Some(ov) = ch_ref.overlay.get(i) {
1000                match ov {
1001                    OverlayValue::Empty => {
1002                        tag_b.append_value(TypeTag::Empty as u8);
1003                        nb.append_null();
1004                        bb.append_null();
1005                        sb.append_null();
1006                        eb.append_null();
1007                    }
1008                    OverlayValue::Number(n) => {
1009                        tag_b.append_value(TypeTag::Number as u8);
1010                        nb.append_value(*n);
1011                        non_num += 1;
1012                        bb.append_null();
1013                        sb.append_null();
1014                        eb.append_null();
1015                    }
1016                    OverlayValue::Boolean(b) => {
1017                        tag_b.append_value(TypeTag::Boolean as u8);
1018                        nb.append_null();
1019                        bb.append_value(*b);
1020                        non_bool += 1;
1021                        sb.append_null();
1022                        eb.append_null();
1023                    }
1024                    OverlayValue::Text(s) => {
1025                        tag_b.append_value(TypeTag::Text as u8);
1026                        nb.append_null();
1027                        bb.append_null();
1028                        sb.append_value(s);
1029                        non_text += 1;
1030                        eb.append_null();
1031                    }
1032                    OverlayValue::Error(code) => {
1033                        tag_b.append_value(TypeTag::Error as u8);
1034                        nb.append_null();
1035                        bb.append_null();
1036                        sb.append_null();
1037                        eb.append_value(*code);
1038                        non_err += 1;
1039                    }
1040                    OverlayValue::Pending => {
1041                        tag_b.append_value(TypeTag::Pending as u8);
1042                        nb.append_null();
1043                        bb.append_null();
1044                        sb.append_null();
1045                        eb.append_null();
1046                    }
1047                }
1048            } else {
1049                let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
1050                match tag {
1051                    TypeTag::Empty => {
1052                        tag_b.append_value(TypeTag::Empty as u8);
1053                        nb.append_null();
1054                        bb.append_null();
1055                        sb.append_null();
1056                        eb.append_null();
1057                    }
1058                    TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1059                        tag_b.append_value(TypeTag::Number as u8);
1060                        if let Some(a) = &ch_ref.numbers {
1061                            let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
1062                            if fa.is_null(i) {
1063                                nb.append_null();
1064                            } else {
1065                                nb.append_value(fa.value(i));
1066                                non_num += 1;
1067                            }
1068                        } else {
1069                            nb.append_null();
1070                        }
1071                        bb.append_null();
1072                        sb.append_null();
1073                        eb.append_null();
1074                    }
1075                    TypeTag::Boolean => {
1076                        tag_b.append_value(TypeTag::Boolean as u8);
1077                        nb.append_null();
1078                        if let Some(a) = &ch_ref.booleans {
1079                            let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
1080                            if ba.is_null(i) {
1081                                bb.append_null();
1082                            } else {
1083                                bb.append_value(ba.value(i));
1084                                non_bool += 1;
1085                            }
1086                        } else {
1087                            bb.append_null();
1088                        }
1089                        sb.append_null();
1090                        eb.append_null();
1091                    }
1092                    TypeTag::Text => {
1093                        tag_b.append_value(TypeTag::Text as u8);
1094                        nb.append_null();
1095                        bb.append_null();
1096                        if let Some(a) = &ch_ref.text {
1097                            let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
1098                            if sa.is_null(i) {
1099                                sb.append_null();
1100                            } else {
1101                                sb.append_value(sa.value(i));
1102                                non_text += 1;
1103                            }
1104                        } else {
1105                            sb.append_null();
1106                        }
1107                        eb.append_null();
1108                    }
1109                    TypeTag::Error => {
1110                        tag_b.append_value(TypeTag::Error as u8);
1111                        nb.append_null();
1112                        bb.append_null();
1113                        sb.append_null();
1114                        if let Some(a) = &ch_ref.errors {
1115                            let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
1116                            if ea.is_null(i) {
1117                                eb.append_null();
1118                            } else {
1119                                eb.append_value(ea.value(i));
1120                                non_err += 1;
1121                            }
1122                        } else {
1123                            eb.append_null();
1124                        }
1125                    }
1126                    TypeTag::Pending => {
1127                        tag_b.append_value(TypeTag::Pending as u8);
1128                        nb.append_null();
1129                        bb.append_null();
1130                        sb.append_null();
1131                        eb.append_null();
1132                    }
1133                }
1134            }
1135        }
1136        let tags = Arc::new(tag_b.finish());
1137        let numbers = {
1138            let a = nb.finish();
1139            if non_num == 0 {
1140                None
1141            } else {
1142                Some(Arc::new(a))
1143            }
1144        };
1145        let booleans = {
1146            let a = bb.finish();
1147            if non_bool == 0 {
1148                None
1149            } else {
1150                Some(Arc::new(a))
1151            }
1152        };
1153        let text = {
1154            let a = sb.finish();
1155            if non_text == 0 {
1156                None
1157            } else {
1158                Some(Arc::new(a) as ArrayRef)
1159            }
1160        };
1161        let errors = {
1162            let a = eb.finish();
1163            if non_err == 0 {
1164                None
1165            } else {
1166                Some(Arc::new(a))
1167            }
1168        };
1169        // Swap in rebuilt chunk and clear overlay
1170        let ch_mut = &mut self.columns[col_idx].chunks[ch_idx];
1171        ch_mut.type_tag = tags;
1172        ch_mut.numbers = numbers;
1173        ch_mut.booleans = booleans;
1174        ch_mut.text = text;
1175        ch_mut.errors = errors;
1176        ch_mut.overlay.clear();
1177        ch_mut.meta.len = len;
1178        ch_mut.meta.non_null_num = non_num;
1179        ch_mut.meta.non_null_bool = non_bool;
1180        ch_mut.meta.non_null_text = non_text;
1181        ch_mut.meta.non_null_err = non_err;
1182        true
1183    }
1184
1185    /// Insert `count` rows before absolute 0-based row `before`.
1186    pub fn insert_rows(&mut self, before: usize, count: usize) {
1187        if count == 0 {
1188            return;
1189        }
1190        if self.columns.is_empty() {
1191            // No columns: just extend nrows
1192            self.nrows = self.nrows.saturating_add(count as u32);
1193            return;
1194        }
1195        let total_rows = self.nrows as usize;
1196        let insert_at = before.min(total_rows);
1197        // Locate split chunk and offset
1198        let (ch_idx, in_off) = if insert_at == total_rows && !self.chunk_starts.is_empty() {
1199            // Append after last row: split after last chunk
1200            let last_idx = self.chunk_starts.len() - 1;
1201            let last_len = self.columns[0].chunks[last_idx].type_tag.len();
1202            (last_idx, last_len)
1203        } else {
1204            self.chunk_of_row(insert_at).unwrap_or((0, 0))
1205        };
1206        // Rebuild chunks for each column
1207        for col in &mut self.columns {
1208            let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 1);
1209            for i in 0..col.chunks.len() {
1210                if i != ch_idx {
1211                    new_chunks.push(col.chunks[i].clone());
1212                } else {
1213                    let orig = &col.chunks[i];
1214                    let len = orig.type_tag.len();
1215                    if in_off > 0 {
1216                        new_chunks.push(Self::slice_chunk(orig, 0, in_off));
1217                    }
1218                    new_chunks.push(Self::make_empty_chunk(count));
1219                    if in_off < len {
1220                        new_chunks.push(Self::slice_chunk(orig, in_off, len - in_off));
1221                    }
1222                }
1223            }
1224            col.chunks = new_chunks;
1225        }
1226        self.nrows = (total_rows + count) as u32;
1227        self.recompute_chunk_starts();
1228    }
1229
1230    /// Delete `count` rows starting from absolute 0-based row `start`.
1231    pub fn delete_rows(&mut self, start: usize, count: usize) {
1232        if count == 0 || self.columns.is_empty() || self.nrows == 0 {
1233            return;
1234        }
1235        let total_rows = self.nrows as usize;
1236        if start >= total_rows {
1237            return;
1238        }
1239        let end = (start + count).min(total_rows);
1240        // For each column rebuild chunk list by slicing out deleted window
1241        for col in &mut self.columns {
1242            let mut new_chunks: Vec<ColumnChunk> = Vec::new();
1243            let mut cur_start = 0usize;
1244            for ch in &col.chunks {
1245                let len = ch.type_tag.len();
1246                let ch_end = cur_start + len;
1247                // No overlap
1248                if ch_end <= start || cur_start >= end {
1249                    new_chunks.push(ch.clone());
1250                } else {
1251                    // Overlap exists
1252                    let del_start = start.max(cur_start);
1253                    let del_end = end.min(ch_end);
1254                    let left_len = del_start.saturating_sub(cur_start);
1255                    let right_len = ch_end.saturating_sub(del_end);
1256                    if left_len > 0 {
1257                        new_chunks.push(Self::slice_chunk(ch, 0, left_len));
1258                    }
1259                    if right_len > 0 {
1260                        let off = len - right_len;
1261                        new_chunks.push(Self::slice_chunk(ch, off, right_len));
1262                    }
1263                }
1264                cur_start = ch_end;
1265            }
1266            col.chunks = new_chunks;
1267        }
1268        self.nrows = (total_rows - (end - start)) as u32;
1269        self.recompute_chunk_starts();
1270    }
1271
1272    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
1273    pub fn insert_columns(&mut self, before: usize, count: usize) {
1274        if count == 0 {
1275            return;
1276        }
1277        // Determine chunk schema from first column if present
1278        let empty_col = |lens: &[usize]| -> ArrowColumn {
1279            let mut chunks = Vec::with_capacity(lens.len());
1280            for &l in lens {
1281                chunks.push(Self::make_empty_chunk(l));
1282            }
1283            ArrowColumn { chunks, index: 0 }
1284        };
1285        let lens: Vec<usize> = if let Some(col0) = self.columns.first() {
1286            col0.chunks.iter().map(|c| c.type_tag.len()).collect()
1287        } else {
1288            // No columns: single chunk matching nrows if any
1289            if self.nrows > 0 {
1290                vec![self.nrows as usize]
1291            } else {
1292                Vec::new()
1293            }
1294        };
1295        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
1296        let before_idx = before.min(self.columns.len());
1297        for (i, col) in self.columns.iter_mut().enumerate() {
1298            if i == before_idx {
1299                for _ in 0..count {
1300                    cols_new.push(empty_col(&lens));
1301                }
1302            }
1303            cols_new.push(col.clone());
1304        }
1305        if before_idx == self.columns.len() {
1306            for _ in 0..count {
1307                cols_new.push(empty_col(&lens));
1308            }
1309        }
1310        // Fix column indices
1311        for (idx, col) in cols_new.iter_mut().enumerate() {
1312            col.index = idx as u32;
1313        }
1314        self.columns = cols_new;
1315        // chunk_starts unchanged; lens were matched
1316    }
1317
1318    /// Delete `count` columns starting at absolute 0-based column `start`.
1319    pub fn delete_columns(&mut self, start: usize, count: usize) {
1320        if count == 0 || self.columns.is_empty() {
1321            return;
1322        }
1323        let end = (start + count).min(self.columns.len());
1324        if start >= end {
1325            return;
1326        }
1327        self.columns.drain(start..end);
1328        for (idx, col) in self.columns.iter_mut().enumerate() {
1329            col.index = idx as u32;
1330        }
1331    }
1332}
1333
1334#[derive(Debug, Clone, Copy)]
1335pub struct ColumnShape {
1336    pub index: u32,
1337    pub chunks: usize,
1338    pub rows: usize,
1339    pub has_num: bool,
1340    pub has_bool: bool,
1341    pub has_text: bool,
1342    pub has_err: bool,
1343}
1344
1345impl<'a> ArrowRangeView<'a> {
1346    /// Absolute 0-based start row of this view.
1347    pub fn start_row(&self) -> usize {
1348        self.sr
1349    }
1350    /// Absolute 0-based end row of this view (inclusive).
1351    pub fn end_row(&self) -> usize {
1352        self.er
1353    }
1354    /// Absolute 0-based start column of this view.
1355    pub fn start_col(&self) -> usize {
1356        self.sc
1357    }
1358    /// Absolute 0-based end column of this view (inclusive).
1359    pub fn end_col(&self) -> usize {
1360        self.ec
1361    }
1362    /// Owning sheet name.
1363    pub fn sheet_name(&self) -> &str {
1364        &self.sheet.name
1365    }
1366    #[inline]
1367    pub fn dims(&self) -> (usize, usize) {
1368        (self.rows, self.cols)
1369    }
1370
1371    /// Returns a single cell value relative to this view (row/col 0-based).
1372    /// OOB returns Empty. Phase A: Date/Time/Duration come back as Number
1373    /// with the corresponding TypeTag preserved for higher layers.
1374    pub fn get_cell(&self, row: usize, col: usize) -> LiteralValue {
1375        if row >= self.rows || col >= self.cols {
1376            return LiteralValue::Empty;
1377        }
1378        let abs_row = self.sr + row;
1379        let abs_col = self.sc + col;
1380        let sheet_rows = self.sheet.nrows as usize;
1381        if abs_row >= sheet_rows {
1382            return LiteralValue::Empty;
1383        }
1384        if abs_col >= self.sheet.columns.len() {
1385            return LiteralValue::Empty;
1386        }
1387        let col_ref = &self.sheet.columns[abs_col];
1388        // Locate chunk by binary searching start offsets
1389        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
1390            Ok(i) => i,
1391            Err(0) => 0,
1392            Err(i) => i - 1,
1393        };
1394        if ch_idx >= col_ref.chunks.len() {
1395            return LiteralValue::Empty;
1396        }
1397        let ch = &col_ref.chunks[ch_idx];
1398        let row_start = self.chunk_starts[ch_idx];
1399        let in_off = abs_row - row_start;
1400        // Overlay takes precedence
1401        if let Some(ov) = ch.overlay.get(in_off) {
1402            return match ov {
1403                OverlayValue::Empty => LiteralValue::Empty,
1404                OverlayValue::Number(n) => LiteralValue::Number(*n),
1405                OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
1406                OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
1407                OverlayValue::Error(code) => {
1408                    let kind = unmap_error_code(*code);
1409                    LiteralValue::Error(ExcelError::new(kind))
1410                }
1411                OverlayValue::Pending => LiteralValue::Pending,
1412            };
1413        }
1414        // Read tag and route to lane
1415        let tag_u8 = ch.type_tag.value(in_off);
1416        match TypeTag::from_u8(tag_u8) {
1417            TypeTag::Empty => LiteralValue::Empty,
1418            TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1419                if let Some(arr) = &ch.numbers {
1420                    if arr.is_null(in_off) {
1421                        return LiteralValue::Empty;
1422                    }
1423                    let nums = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1424                    LiteralValue::Number(nums.value(in_off))
1425                } else {
1426                    LiteralValue::Empty
1427                }
1428            }
1429            TypeTag::Boolean => {
1430                if let Some(arr) = &ch.booleans {
1431                    if arr.is_null(in_off) {
1432                        return LiteralValue::Empty;
1433                    }
1434                    let ba = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1435                    LiteralValue::Boolean(ba.value(in_off))
1436                } else {
1437                    LiteralValue::Empty
1438                }
1439            }
1440            TypeTag::Text => {
1441                if let Some(arr) = &ch.text {
1442                    if arr.is_null(in_off) {
1443                        return LiteralValue::Empty;
1444                    }
1445                    let sa = arr.as_any().downcast_ref::<StringArray>().unwrap();
1446                    LiteralValue::Text(sa.value(in_off).to_string())
1447                } else {
1448                    LiteralValue::Empty
1449                }
1450            }
1451            TypeTag::Error => {
1452                if let Some(arr) = &ch.errors {
1453                    if arr.is_null(in_off) {
1454                        return LiteralValue::Empty;
1455                    }
1456                    let ea = arr.as_any().downcast_ref::<UInt8Array>().unwrap();
1457                    let kind = unmap_error_code(ea.value(in_off));
1458                    LiteralValue::Error(ExcelError::new(kind))
1459                } else {
1460                    LiteralValue::Empty
1461                }
1462            }
1463            TypeTag::Pending => LiteralValue::Pending,
1464        }
1465    }
1466
1467    /// Row-aligned chunk slices within this view. Each item represents
1468    /// a contiguous row segment that lies fully within a single row chunk.
1469    pub fn row_chunk_slices(&self) -> Vec<ChunkSlice> {
1470        let mut out = Vec::new();
1471        if self.rows == 0 || self.cols == 0 {
1472            return out;
1473        }
1474        // Iterate overlapping chunks by row using first column's chunk map
1475        let sheet_rows = self.sheet.nrows as usize;
1476        let row_end = self.er.min(sheet_rows.saturating_sub(1));
1477        if self.chunk_starts.is_empty() {
1478            return out;
1479        }
1480        // For each chunk, compute intersection with [sr..=row_end]
1481        for (ci, &start) in self.chunk_starts.iter().enumerate() {
1482            let len = if ci + 1 < self.chunk_starts.len() {
1483                self.chunk_starts[ci + 1] - start
1484            } else {
1485                // last chunk length from first column
1486                if let Some(col0) = self.sheet.columns.first() {
1487                    col0.chunks[ci].type_tag.len()
1488                } else {
1489                    0
1490                }
1491            };
1492            let end = start + len - 1;
1493            let is = start.max(self.sr);
1494            let ie = end.min(row_end);
1495            if is > ie {
1496                continue;
1497            }
1498            let seg_len = ie - is + 1;
1499            let rel_off = is - start; // offset into chunk arrays
1500            // Collect per-column lane slices for columns in [sc..=ec]
1501            let mut cols = Vec::with_capacity(self.cols);
1502            for col_idx in self.sc..=self.ec {
1503                if col_idx >= self.sheet.columns.len() {
1504                    // Pad out-of-bounds columns with empty (null) lanes and Empty type_tag
1505
1506                    let numbers = Some(new_null_array(&DataType::Float64, seg_len));
1507                    let booleans = Some(new_null_array(&DataType::Boolean, seg_len));
1508                    let text = Some(new_null_array(&DataType::Utf8, seg_len));
1509                    let errors = Some(new_null_array(&DataType::UInt8, seg_len));
1510                    let type_tag: ArrayRef =
1511                        Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; seg_len]));
1512                    cols.push(ChunkCol {
1513                        numbers,
1514                        booleans,
1515                        text,
1516                        errors,
1517                        type_tag,
1518                    });
1519                } else {
1520                    let col = &self.sheet.columns[col_idx];
1521                    let ch = if ci < col.chunks.len() {
1522                        &col.chunks[ci]
1523                    } else {
1524                        // Should not happen with enforced alignment; pad as OOB if it does
1525                        let numbers = Some(new_null_array(&DataType::Float64, seg_len));
1526                        let booleans = Some(new_null_array(&DataType::Boolean, seg_len));
1527                        let text = Some(new_null_array(&DataType::Utf8, seg_len));
1528                        let errors = Some(new_null_array(&DataType::UInt8, seg_len));
1529                        let type_tag: ArrayRef =
1530                            Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; seg_len]));
1531                        cols.push(ChunkCol {
1532                            numbers,
1533                            booleans,
1534                            text,
1535                            errors,
1536                            type_tag,
1537                        });
1538                        continue;
1539                    };
1540                    use arrow_array::Array;
1541                    // Always provide a slice, lazily using per-chunk null arrays when the lane is absent
1542                    let numbers_base: ArrayRef = ch.numbers_or_null();
1543                    let booleans_base: ArrayRef = ch.booleans_or_null();
1544                    let text_base: ArrayRef = ch.text_or_null();
1545                    let errors_base: ArrayRef = ch.errors_or_null();
1546                    let numbers = Some(Array::slice(numbers_base.as_ref(), rel_off, seg_len));
1547                    let booleans = Some(Array::slice(booleans_base.as_ref(), rel_off, seg_len));
1548                    let text = Some(Array::slice(text_base.as_ref(), rel_off, seg_len));
1549                    let errors = Some(Array::slice(errors_base.as_ref(), rel_off, seg_len));
1550                    let type_tag: ArrayRef = Array::slice(ch.type_tag.as_ref(), rel_off, seg_len);
1551                    cols.push(ChunkCol {
1552                        numbers,
1553                        booleans,
1554                        text,
1555                        errors,
1556                        type_tag,
1557                    });
1558                }
1559            }
1560            out.push(ChunkSlice {
1561                row_start: is - self.sr,
1562                row_len: seg_len,
1563                cols,
1564            });
1565        }
1566        out
1567    }
1568
1569    /// Convenience iterator over row-aligned chunk slices.
1570    pub fn iter_row_chunks(&'a self) -> impl Iterator<Item = ChunkSlice> + 'a {
1571        self.row_chunk_slices().into_iter()
1572    }
1573
1574    /// Typed numeric slices per row-segment: (row_start, row_len, per-column Float64 arrays)
1575    pub fn numbers_slices(
1576        &'a self,
1577    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<Float64Array>>)> + 'a {
1578        use crate::compute_prelude::zip_select;
1579        self.iter_row_chunks().map(move |cs| {
1580            let mut out_cols: Vec<Arc<Float64Array>> = Vec::with_capacity(cs.cols.len());
1581            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1582                let base = cs.cols[local_c]
1583                    .numbers
1584                    .as_ref()
1585                    .expect("numbers lane exists")
1586                    .clone();
1587                let base_fa = base
1588                    .as_any()
1589                    .downcast_ref::<Float64Array>()
1590                    .unwrap()
1591                    .clone();
1592                let base_arc = Arc::new(base_fa);
1593
1594                // Identify chunk and overlay segment
1595                let abs_seg_start = self.sr + cs.row_start;
1596                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1597                    Ok(i) => i,
1598                    Err(0) => 0,
1599                    Err(i) => i - 1,
1600                };
1601                if col_idx >= self.sheet.columns.len() {
1602                    out_cols.push(base_arc);
1603                    continue;
1604                }
1605                let col = &self.sheet.columns[col_idx];
1606                let ch = &col.chunks[ch_idx];
1607                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1608                let seg_range = rel_off..(rel_off + cs.row_len);
1609                if ch.overlay.any_in_range(seg_range.clone()) {
1610                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1611                    let mut ob = Float64Builder::with_capacity(cs.row_len);
1612                    for i in 0..cs.row_len {
1613                        if let Some(ov) = ch.overlay.get(rel_off + i) {
1614                            mask_b.append_value(true);
1615                            match ov {
1616                                OverlayValue::Number(n) => ob.append_value(*n),
1617                                _ => ob.append_null(),
1618                            }
1619                        } else {
1620                            mask_b.append_value(false);
1621                            ob.append_null();
1622                        }
1623                    }
1624                    let mask = mask_b.finish();
1625                    let overlay_vals = ob.finish();
1626                    let base_fa = base.as_any().downcast_ref::<Float64Array>().unwrap();
1627                    let zipped = zip_select(&mask, &overlay_vals, base_fa).expect("zip overlay");
1628                    let fa = zipped
1629                        .as_any()
1630                        .downcast_ref::<Float64Array>()
1631                        .unwrap()
1632                        .clone();
1633                    out_cols.push(Arc::new(fa));
1634                } else {
1635                    out_cols.push(base_arc);
1636                }
1637            }
1638            (cs.row_start, cs.row_len, out_cols)
1639        })
1640    }
1641
1642    /// Typed boolean slices per row-segment, overlay-aware via zip.
1643    pub fn booleans_slices(
1644        &'a self,
1645    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<BooleanArray>>)> + 'a {
1646        use crate::compute_prelude::zip_select;
1647        self.iter_row_chunks().map(move |cs| {
1648            let mut out_cols: Vec<Arc<BooleanArray>> = Vec::with_capacity(cs.cols.len());
1649            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1650                let base = cs.cols[local_c]
1651                    .booleans
1652                    .as_ref()
1653                    .expect("booleans lane exists")
1654                    .clone();
1655                let base_ba = base
1656                    .as_any()
1657                    .downcast_ref::<BooleanArray>()
1658                    .unwrap()
1659                    .clone();
1660                let base_arc: Arc<BooleanArray> = Arc::new(base_ba);
1661
1662                let abs_seg_start = self.sr + cs.row_start;
1663                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1664                    Ok(i) => i,
1665                    Err(0) => 0,
1666                    Err(i) => i - 1,
1667                };
1668                if col_idx >= self.sheet.columns.len() {
1669                    out_cols.push(base_arc);
1670                    continue;
1671                }
1672                let col = &self.sheet.columns[col_idx];
1673                let ch = &col.chunks[ch_idx];
1674                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1675                let seg_range = rel_off..(rel_off + cs.row_len);
1676                if ch.overlay.any_in_range(seg_range.clone()) {
1677                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1678                    let mut bb = BooleanBuilder::with_capacity(cs.row_len);
1679                    for i in 0..cs.row_len {
1680                        if let Some(ov) = ch.overlay.get(rel_off + i) {
1681                            mask_b.append_value(true);
1682                            match ov {
1683                                OverlayValue::Boolean(b) => bb.append_value(*b),
1684                                _ => bb.append_null(),
1685                            }
1686                        } else {
1687                            mask_b.append_value(false);
1688                            bb.append_null();
1689                        }
1690                    }
1691                    let mask = mask_b.finish();
1692                    let overlay_vals = bb.finish();
1693                    let base_ba = base.as_any().downcast_ref::<BooleanArray>().unwrap();
1694                    let zipped =
1695                        zip_select(&mask, &overlay_vals, base_ba).expect("zip boolean overlay");
1696                    let ba = zipped
1697                        .as_any()
1698                        .downcast_ref::<BooleanArray>()
1699                        .unwrap()
1700                        .clone();
1701                    out_cols.push(Arc::new(ba));
1702                } else {
1703                    out_cols.push(base_arc);
1704                }
1705            }
1706            (cs.row_start, cs.row_len, out_cols)
1707        })
1708    }
1709
1710    /// Text slices per row-segment (erased as ArrayRef for Utf8 today; future Dict/View support).
1711    pub fn text_slices(&'a self) -> impl Iterator<Item = (usize, usize, Vec<ArrayRef>)> + 'a {
1712        use crate::compute_prelude::zip_select;
1713        self.iter_row_chunks().map(move |cs| {
1714            let mut out_cols: Vec<ArrayRef> = Vec::with_capacity(cs.cols.len());
1715            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1716                let base = cs.cols[local_c]
1717                    .text
1718                    .as_ref()
1719                    .expect("text lane exists")
1720                    .clone();
1721                let abs_seg_start = self.sr + cs.row_start;
1722                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1723                    Ok(i) => i,
1724                    Err(0) => 0,
1725                    Err(i) => i - 1,
1726                };
1727                if col_idx >= self.sheet.columns.len() {
1728                    out_cols.push(base.clone());
1729                    continue;
1730                }
1731                let col = &self.sheet.columns[col_idx];
1732                let ch = &col.chunks[ch_idx];
1733                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1734                let seg_range = rel_off..(rel_off + cs.row_len);
1735                if ch.overlay.any_in_range(seg_range.clone()) {
1736                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1737                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
1738                    for i in 0..cs.row_len {
1739                        if let Some(ov) = ch.overlay.get(rel_off + i) {
1740                            mask_b.append_value(true);
1741                            match ov {
1742                                OverlayValue::Text(s) => sb.append_value(s),
1743                                _ => sb.append_null(),
1744                            }
1745                        } else {
1746                            mask_b.append_value(false);
1747                            sb.append_null();
1748                        }
1749                    }
1750                    let mask = mask_b.finish();
1751                    let overlay_vals = sb.finish();
1752                    let base_sa = base.as_any().downcast_ref::<StringArray>().unwrap();
1753                    let zipped =
1754                        zip_select(&mask, &overlay_vals, base_sa).expect("zip text overlay");
1755                    out_cols.push(zipped);
1756                } else {
1757                    out_cols.push(base.clone());
1758                }
1759            }
1760            (cs.row_start, cs.row_len, out_cols)
1761        })
1762    }
1763
1764    /// Typed error-code slices per row-segment.
1765    pub fn errors_slices(
1766        &'a self,
1767    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<UInt8Array>>)> + 'a {
1768        use crate::compute_prelude::zip_select;
1769        self.iter_row_chunks().map(move |cs| {
1770            let mut out_cols: Vec<Arc<UInt8Array>> = Vec::with_capacity(cs.cols.len());
1771            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1772                let base = cs.cols[local_c]
1773                    .errors
1774                    .as_ref()
1775                    .expect("errors lane exists")
1776                    .clone();
1777                let base_e = base.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
1778                let base_arc: Arc<UInt8Array> = Arc::new(base_e);
1779                let abs_seg_start = self.sr + cs.row_start;
1780                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1781                    Ok(i) => i,
1782                    Err(0) => 0,
1783                    Err(i) => i - 1,
1784                };
1785                if col_idx >= self.sheet.columns.len() {
1786                    out_cols.push(base_arc);
1787                    continue;
1788                }
1789                let col = &self.sheet.columns[col_idx];
1790                let ch = &col.chunks[ch_idx];
1791                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1792                let seg_range = rel_off..(rel_off + cs.row_len);
1793                if ch.overlay.any_in_range(seg_range.clone()) {
1794                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1795                    let mut eb = UInt8Builder::with_capacity(cs.row_len);
1796                    for i in 0..cs.row_len {
1797                        if let Some(ov) = ch.overlay.get(rel_off + i) {
1798                            mask_b.append_value(true);
1799                            match ov {
1800                                OverlayValue::Error(code) => eb.append_value(*code),
1801                                _ => eb.append_null(),
1802                            }
1803                        } else {
1804                            mask_b.append_value(false);
1805                            eb.append_null();
1806                        }
1807                    }
1808                    let mask = mask_b.finish();
1809                    let overlay_vals = eb.finish();
1810                    let base_ea = base.as_any().downcast_ref::<UInt8Array>().unwrap();
1811                    let zipped =
1812                        zip_select(&mask, &overlay_vals, base_ea).expect("zip err overlay");
1813                    let ea = zipped
1814                        .as_any()
1815                        .downcast_ref::<UInt8Array>()
1816                        .unwrap()
1817                        .clone();
1818                    out_cols.push(Arc::new(ea));
1819                } else {
1820                    out_cols.push(base_arc);
1821                }
1822            }
1823            (cs.row_start, cs.row_len, out_cols)
1824        })
1825    }
1826
1827    /// Build per-column concatenated lowered text arrays for this view.
1828    /// Uses per-chunk lowered cache for base text and merges overlays via zip_select.
1829    pub fn lowered_text_columns(&self) -> Vec<ArrayRef> {
1830        use arrow_array::Array;
1831        let mut out: Vec<ArrayRef> = Vec::with_capacity(self.cols);
1832        if self.rows == 0 || self.cols == 0 {
1833            return out;
1834        }
1835        let row_end = self.er.min(self.sheet.nrows.saturating_sub(1) as usize);
1836        for col_idx in self.sc..=self.ec {
1837            let mut segs: Vec<ArrayRef> = Vec::new();
1838            if col_idx >= self.sheet.columns.len() {
1839                // OOB: nulls across rows
1840                segs.push(new_null_array(&DataType::Utf8, self.rows));
1841            } else {
1842                let col_ref = &self.sheet.columns[col_idx];
1843                for (ci, &start) in self.chunk_starts.iter().enumerate() {
1844                    // length of this chunk
1845                    let len = col_ref
1846                        .chunks
1847                        .get(ci)
1848                        .map(|c| c.type_tag.len())
1849                        .unwrap_or(0);
1850                    if len == 0 {
1851                        continue;
1852                    }
1853                    let end = start + len - 1;
1854                    let is = start.max(self.sr);
1855                    let ie = end.min(row_end);
1856                    if is > ie {
1857                        continue;
1858                    }
1859                    let seg_len = ie - is + 1;
1860                    let rel_off = is - start;
1861                    if let Some(ch) = col_ref.chunks.get(ci) {
1862                        // Overlay-aware lowered segment
1863                        if ch.overlay.any_in_range(rel_off..(rel_off + seg_len)) {
1864                            // Build lowered overlay values builder
1865                            let mut sb = arrow_array::builder::StringBuilder::with_capacity(
1866                                seg_len,
1867                                seg_len * 8,
1868                            );
1869                            // mask overlaid rows
1870                            let mut mb =
1871                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1872                            for i in 0..seg_len {
1873                                if let Some(ov) = ch.overlay.get(rel_off + i) {
1874                                    match ov {
1875                                        OverlayValue::Text(s) => {
1876                                            sb.append_value(s.to_ascii_lowercase());
1877                                            mb.append_value(true);
1878                                        }
1879                                        OverlayValue::Empty => {
1880                                            sb.append_null();
1881                                            mb.append_value(true);
1882                                        }
1883                                        OverlayValue::Number(n) => {
1884                                            sb.append_value(n.to_string().to_ascii_lowercase());
1885                                            mb.append_value(true);
1886                                        }
1887                                        OverlayValue::Boolean(b) => {
1888                                            sb.append_value(if *b { "true" } else { "false" });
1889                                            mb.append_value(true);
1890                                        }
1891                                        OverlayValue::Error(_) | OverlayValue::Pending => {
1892                                            sb.append_null();
1893                                            mb.append_value(true);
1894                                        }
1895                                    }
1896                                } else {
1897                                    // not overlaid
1898                                    sb.append_null();
1899                                    mb.append_value(false);
1900                                }
1901                            }
1902                            let overlay_vals = sb.finish();
1903                            let mask = mb.finish();
1904                            // base lowered segment
1905                            let base_lowered = ch.text_lower_or_null();
1906                            let base_seg = Array::slice(&base_lowered, rel_off, seg_len);
1907                            let base_sa = base_seg
1908                                .as_any()
1909                                .downcast_ref::<StringArray>()
1910                                .expect("lowered slice downcast");
1911                            let zipped = zip_select(&mask, &overlay_vals, base_sa)
1912                                .expect("zip lowered text overlay");
1913                            segs.push(zipped);
1914                        } else {
1915                            // No overlay: slice from lowered base
1916                            let lowered = ch.text_lower_or_null();
1917                            segs.push(Array::slice(&lowered, rel_off, seg_len));
1918                        }
1919                    }
1920                }
1921            }
1922            // Concat segments for this column
1923            let anys: Vec<&dyn Array> = segs.iter().map(|a| a.as_ref() as &dyn Array).collect();
1924            let conc = concat_arrays(&anys).expect("concat lowered segments");
1925            out.push(conc);
1926        }
1927        out
1928    }
1929}
1930
1931pub struct ChunkSlice {
1932    pub row_start: usize, // relative to view top
1933    pub row_len: usize,
1934    pub cols: Vec<ChunkCol>,
1935}
1936
1937pub struct ChunkCol {
1938    pub numbers: Option<ArrayRef>,
1939    pub booleans: Option<ArrayRef>,
1940    pub text: Option<ArrayRef>,
1941    pub errors: Option<ArrayRef>,
1942    pub type_tag: ArrayRef,
1943}
1944
1945#[cfg(test)]
1946mod tests {
1947    use super::*;
1948    use arrow_array::Array;
1949    use arrow_schema::DataType;
1950
1951    #[test]
1952    fn ingest_mixed_rows_into_lanes_and_tags() {
1953        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
1954        let data = vec![
1955            LiteralValue::Number(42.5),                   // Number
1956            LiteralValue::Empty,                          // Empty
1957            LiteralValue::Text(String::new()),            // Empty text (Text lane)
1958            LiteralValue::Boolean(true),                  // Boolean
1959            LiteralValue::Error(ExcelError::new_value()), // Error
1960        ];
1961        for v in &data {
1962            b.append_row(std::slice::from_ref(v)).unwrap();
1963        }
1964        let sheet = b.finish();
1965        assert_eq!(sheet.nrows, 5);
1966        assert_eq!(sheet.columns.len(), 1);
1967        assert_eq!(sheet.columns[0].chunks.len(), 1);
1968        let ch = &sheet.columns[0].chunks[0];
1969
1970        // Type tags
1971        let tags = ch.type_tag.values();
1972        assert_eq!(tags.len(), 5);
1973        assert_eq!(tags[0], TypeTag::Number as u8);
1974        assert_eq!(tags[1], TypeTag::Empty as u8);
1975        assert_eq!(tags[2], TypeTag::Text as u8);
1976        assert_eq!(tags[3], TypeTag::Boolean as u8);
1977        assert_eq!(tags[4], TypeTag::Error as u8);
1978
1979        // Numbers lane validity
1980        let nums = ch.numbers.as_ref().unwrap();
1981        assert_eq!(nums.len(), 5);
1982        assert_eq!(nums.null_count(), 4);
1983        assert!(nums.is_valid(0));
1984
1985        // Booleans lane validity
1986        let bools = ch.booleans.as_ref().unwrap();
1987        assert_eq!(bools.len(), 5);
1988        assert_eq!(bools.null_count(), 4);
1989        assert!(bools.is_valid(3));
1990
1991        // Text lane validity
1992        let txt = ch.text.as_ref().unwrap();
1993        assert_eq!(txt.len(), 5);
1994        assert_eq!(txt.null_count(), 4);
1995        assert!(txt.is_valid(2)); // ""
1996
1997        // Errors lane
1998        let errs = ch.errors.as_ref().unwrap();
1999        assert_eq!(errs.len(), 5);
2000        assert_eq!(errs.null_count(), 4);
2001        assert!(errs.is_valid(4));
2002    }
2003
2004    #[test]
2005    fn range_view_get_cell_and_padding() {
2006        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2007        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
2008            .unwrap();
2009        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
2010            .unwrap();
2011        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
2012            .unwrap();
2013        let sheet = b.finish();
2014        let rv = sheet.range_view(0, 0, 2, 1);
2015        assert_eq!(rv.dims(), (3, 2));
2016        // Inside
2017        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
2018        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
2019        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
2020        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
2021        // OOB padding
2022        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
2023        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
2024
2025        // Numbers slices should produce one 2-row and one 1-row segment
2026        let nums: Vec<_> = rv.numbers_slices().collect();
2027        assert_eq!(nums.len(), 2);
2028        assert_eq!(nums[0].0, 0);
2029        assert_eq!(nums[0].1, 2);
2030        assert_eq!(nums[1].0, 2);
2031        assert_eq!(nums[1].1, 1);
2032    }
2033
2034    #[test]
2035    fn row_chunk_slices_shape() {
2036        // chunk_rows=2 leads to two slices for 3 rows
2037        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2038        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
2039            .unwrap();
2040        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
2041            .unwrap();
2042        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
2043            .unwrap();
2044        let sheet = b.finish();
2045        let rv = sheet.range_view(0, 0, 2, 1);
2046        let slices = rv.row_chunk_slices();
2047        assert_eq!(slices.len(), 2);
2048        assert_eq!(slices[0].row_start, 0);
2049        assert_eq!(slices[0].row_len, 2);
2050        assert_eq!(slices[0].cols.len(), 2);
2051        assert_eq!(slices[1].row_start, 2);
2052        assert_eq!(slices[1].row_len, 1);
2053        assert_eq!(slices[1].cols.len(), 2);
2054    }
2055
2056    #[test]
2057    fn oob_columns_are_padded() {
2058        // Build with 2 columns; request 3 columns (ec beyond last col)
2059        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2060        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
2061            .unwrap();
2062        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
2063            .unwrap();
2064        let sheet = b.finish();
2065        // Request cols [0..=2] → 3 columns with padding
2066        let rv = sheet.range_view(0, 0, 1, 2);
2067        assert_eq!(rv.dims(), (2, 3));
2068        let slices = rv.row_chunk_slices();
2069        assert!(!slices.is_empty());
2070        for cs in &slices {
2071            assert_eq!(cs.cols.len(), 3);
2072        }
2073        // Also validate typed slices return 3 entries per segment
2074        for (_rs, _rl, cols) in rv.numbers_slices() {
2075            assert_eq!(cols.len(), 3);
2076        }
2077        for (_rs, _rl, cols) in rv.booleans_slices() {
2078            assert_eq!(cols.len(), 3);
2079        }
2080        for (_rs, _rl, cols) in rv.text_slices() {
2081            assert_eq!(cols.len(), 3);
2082        }
2083        for (_rs, _rl, cols) in rv.errors_slices() {
2084            assert_eq!(cols.len(), 3);
2085        }
2086    }
2087
2088    #[test]
2089    fn reversed_range_is_empty() {
2090        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2091        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
2092        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
2093        let sheet = b.finish();
2094        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
2095        assert_eq!(rv.dims(), (0, 0));
2096        assert!(rv.row_chunk_slices().is_empty());
2097        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
2098    }
2099
2100    #[test]
2101    fn chunk_alignment_invariant() {
2102        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
2103        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
2104        for r in 0..5 {
2105            b.append_row(&[
2106                LiteralValue::Number(r as f64),
2107                LiteralValue::Text(format!("{r}")),
2108                if r % 2 == 0 {
2109                    LiteralValue::Empty
2110                } else {
2111                    LiteralValue::Boolean(true)
2112                },
2113            ])
2114            .unwrap();
2115        }
2116        let sheet = b.finish();
2117        // chunk_starts should be [0,2,4]
2118        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
2119        // All columns must share per-chunk lengths equal to [2,2,1]
2120        let lens0: Vec<usize> = sheet.columns[0]
2121            .chunks
2122            .iter()
2123            .map(|ch| ch.type_tag.len())
2124            .collect();
2125        for col in &sheet.columns[1..] {
2126            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2127            assert_eq!(lens, lens0);
2128        }
2129    }
2130
2131    #[test]
2132    fn chunking_splits_rows() {
2133        // Two columns, chunk size 2 → expect two chunks
2134        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2135        let rows = vec![
2136            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
2137            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
2138            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
2139        ];
2140        for r in rows {
2141            b.append_row(&r).unwrap();
2142        }
2143        let sheet = b.finish();
2144        assert_eq!(sheet.columns[0].chunks.len(), 2);
2145        assert_eq!(sheet.columns[1].chunks.len(), 2);
2146        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
2147        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
2148    }
2149
2150    #[test]
2151    fn pending_is_not_error() {
2152        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
2153        b.append_row(&[LiteralValue::Pending]).unwrap();
2154        let sheet = b.finish();
2155        let ch = &sheet.columns[0].chunks[0];
2156        // tag is Pending
2157        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
2158        // errors lane is effectively null
2159        let errs = ch.errors_or_null();
2160        assert_eq!(errs.null_count(), 1);
2161    }
2162
2163    #[test]
2164    fn all_null_numeric_lane_uses_null_array() {
2165        // Only text values in first column → numbers lane should be all null with correct dtype
2166        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
2167        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
2168        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
2169        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
2170        let sheet = b.finish();
2171        let ch = &sheet.columns[0].chunks[0];
2172        let nums = ch.numbers_or_null();
2173        assert_eq!(nums.len(), 3);
2174        assert_eq!(nums.null_count(), 3);
2175        assert_eq!(nums.data_type(), &DataType::Float64);
2176    }
2177
2178    #[test]
2179    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
2180        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
2181        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2182        for _ in 0..10 {
2183            b.append_row(&[LiteralValue::Empty]).unwrap();
2184        }
2185        let mut sheet = b.finish();
2186        // Add overlays at row 3 and row 4
2187        {
2188            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
2189            sheet.columns[0].chunks[c0]
2190                .overlay
2191                .set(o0, OverlayValue::Number(30.0));
2192            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
2193            sheet.columns[0].chunks[c1]
2194                .overlay
2195                .set(o1, OverlayValue::Number(40.0));
2196        }
2197        // Insert 2 rows before row 4 (at chunk boundary)
2198        sheet.insert_rows(4, 2);
2199        assert_eq!(sheet.nrows, 12);
2200        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
2201        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2202        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
2203        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
2204        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
2205
2206        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
2207        sheet.delete_rows(3, 3);
2208        assert_eq!(sheet.nrows, 9);
2209        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2210        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
2211        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
2212        let lens0: Vec<usize> = sheet.columns[0]
2213            .chunks
2214            .iter()
2215            .map(|ch| ch.type_tag.len())
2216            .collect();
2217        for col in &sheet.columns {
2218            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2219            assert_eq!(lens, lens0);
2220        }
2221        // chunk_starts should be monotonic and final chunk end == nrows
2222        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2223        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2224        let last_len = sheet.columns[0]
2225            .chunks
2226            .last()
2227            .map(|c| c.type_tag.len())
2228            .unwrap_or(0);
2229        assert_eq!(last_start + last_len, sheet.nrows as usize);
2230    }
2231
2232    #[test]
2233    fn column_insert_delete_retains_chunk_alignment() {
2234        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2235        for _ in 0..5 {
2236            b.append_row(&[
2237                LiteralValue::Empty,
2238                LiteralValue::Empty,
2239                LiteralValue::Empty,
2240            ])
2241            .unwrap();
2242        }
2243        let mut sheet = b.finish();
2244        // Record reference chunk lengths of first column
2245        let ref_lens: Vec<usize> = sheet.columns[0]
2246            .chunks
2247            .iter()
2248            .map(|ch| ch.type_tag.len())
2249            .collect();
2250        // Insert 2 columns before index 1
2251        sheet.insert_columns(1, 2);
2252        assert_eq!(sheet.columns.len(), 5);
2253        for col in &sheet.columns {
2254            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2255            assert_eq!(lens, ref_lens);
2256        }
2257        let starts_before = sheet.chunk_starts.clone();
2258        // Delete 2 columns starting at index 2 → back to 3 columns
2259        sheet.delete_columns(2, 2);
2260        assert_eq!(sheet.columns.len(), 3);
2261        for col in &sheet.columns {
2262            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2263            assert_eq!(lens, ref_lens);
2264        }
2265        // chunk_starts unchanged by column operations
2266        assert_eq!(sheet.chunk_starts, starts_before);
2267    }
2268
2269    #[test]
2270    fn multiple_adjacent_row_ops_overlay_mixed_types() {
2271        use formualizer_common::ExcelErrorKind;
2272        // Two columns to ensure alignment preserved across columns
2273        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
2274        for _ in 0..9 {
2275            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2276                .unwrap();
2277        }
2278        let mut sheet = b.finish();
2279        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
2280        // Column 0 only
2281        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
2282            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2283            sh.columns[0].chunks[ch_i].overlay.set(off, ov);
2284        };
2285        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
2286        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
2287        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
2288        set_ov(
2289            &mut sheet,
2290            6,
2291            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
2292        );
2293        set_ov(&mut sheet, 8, OverlayValue::Empty);
2294
2295        // Insert 1 row before index 3
2296        sheet.insert_rows(3, 1);
2297        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
2298        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2299        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
2300        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
2301        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
2302        match av1.get_cell(7, 0) {
2303            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2304            other => panic!("expected error at row 7, got {other:?}"),
2305        }
2306        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
2307
2308        // Insert 2 rows before index 4 (adjacent to previous region)
2309        sheet.insert_rows(4, 2);
2310        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
2311        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2312        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
2313        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
2314        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
2315        match av2.get_cell(9, 0) {
2316            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2317            other => panic!("expected error at row 9, got {other:?}"),
2318        }
2319        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
2320
2321        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
2322        sheet.delete_rows(6, 2);
2323        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2324        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
2325        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
2326        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
2327        match av3.get_cell(7, 0) {
2328            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2329            other => panic!("expected error at row 8, got {other:?}"),
2330        }
2331        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
2332
2333        // Alignment checks
2334        let lens0: Vec<usize> = sheet.columns[0]
2335            .chunks
2336            .iter()
2337            .map(|ch| ch.type_tag.len())
2338            .collect();
2339        for col in &sheet.columns {
2340            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2341            assert_eq!(lens, lens0);
2342        }
2343        // chunk_starts monotonically increasing and cover nrows
2344        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2345        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2346        let last_len = sheet.columns[0]
2347            .chunks
2348            .last()
2349            .map(|c| c.type_tag.len())
2350            .unwrap_or(0);
2351        assert_eq!(last_start + last_len, sheet.nrows as usize);
2352    }
2353
2354    #[test]
2355    fn multiple_adjacent_column_ops_alignment() {
2356        // Start with 2 columns, chunk_rows=2, rows=5
2357        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2358        for _ in 0..5 {
2359            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2360                .unwrap();
2361        }
2362        let mut sheet = b.finish();
2363        let ref_lens: Vec<usize> = sheet.columns[0]
2364            .chunks
2365            .iter()
2366            .map(|ch| ch.type_tag.len())
2367            .collect();
2368        // Insert 1 at start, then 2 at index 2 → columns = 5
2369        sheet.insert_columns(0, 1);
2370        sheet.insert_columns(2, 2);
2371        assert_eq!(sheet.columns.len(), 5);
2372        for col in &sheet.columns {
2373            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2374            assert_eq!(lens, ref_lens);
2375        }
2376        let starts_before = sheet.chunk_starts.clone();
2377        // Delete 1 at index 1, then 2 at the end if available
2378        sheet.delete_columns(1, 1);
2379        let remain = sheet.columns.len();
2380        if remain >= 3 {
2381            sheet.delete_columns(remain - 2, 2);
2382        }
2383        for col in &sheet.columns {
2384            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2385            assert_eq!(lens, ref_lens);
2386        }
2387        assert_eq!(sheet.chunk_starts, starts_before);
2388    }
2389
2390    #[test]
2391    fn overlays_on_multiple_columns_row_col_ops() {
2392        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
2393        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2394        for _ in 0..6 {
2395            b.append_row(&[
2396                LiteralValue::Empty,
2397                LiteralValue::Empty,
2398                LiteralValue::Empty,
2399            ])
2400            .unwrap();
2401        }
2402        let mut sheet = b.finish();
2403        // Overlays at row2 and row3 across columns with different types
2404        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
2405            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2406            sh.columns[col].chunks[ch_i].overlay.set(off, ov);
2407        };
2408        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
2409        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
2410        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
2411        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
2412        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
2413        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
2414
2415        // Insert a row at boundary (before row index 3)
2416        sheet.insert_rows(3, 1);
2417        // Now original row>=3 shift down by 1
2418        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
2419        // Row 2 values unchanged
2420        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
2421        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
2422        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
2423        // Row 3 became Empty (inserted)
2424        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
2425        // Row 4 holds old row 3 overlays
2426        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
2427        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
2428        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
2429
2430        // Delete column 1 (middle), values shift left
2431        sheet.delete_columns(1, 1);
2432        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
2433        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
2434        // Column 1 now was old column 2
2435        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
2436        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
2437        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
2438
2439        // Alignment preserved
2440        let lens0: Vec<usize> = sheet.columns[0]
2441            .chunks
2442            .iter()
2443            .map(|ch| ch.type_tag.len())
2444            .collect();
2445        for col in &sheet.columns {
2446            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2447            assert_eq!(lens, lens0);
2448        }
2449    }
2450
2451    #[test]
2452    fn effective_slices_overlay_precedence_numbers_text() {
2453        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
2454        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2455        for i in 0..6 {
2456            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2457                .unwrap();
2458        }
2459        let mut sheet = b.finish();
2460        // Overlays: row1 -> Text("X"), row4 -> Number(99)
2461        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2462        sheet.columns[0].chunks[c1]
2463            .overlay
2464            .set(o1, OverlayValue::Text(Arc::from("X")));
2465        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2466        sheet.columns[0].chunks[c4]
2467            .overlay
2468            .set(o4, OverlayValue::Number(99.0));
2469
2470        let av = sheet.range_view(0, 0, 5, 0);
2471        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
2472        let mut numeric: Vec<Option<f64>> = vec![None; 6];
2473        for (row_start, row_len, cols) in av.numbers_slices() {
2474            let a = &cols[0];
2475            for i in 0..row_len {
2476                let idx = row_start + i;
2477                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2478            }
2479        }
2480        assert_eq!(numeric[0], Some(1.0));
2481        assert_eq!(numeric[1], None); // overshadowed by text overlay
2482        assert_eq!(numeric[2], Some(3.0));
2483        assert_eq!(numeric[3], Some(4.0));
2484        assert_eq!(numeric[4], Some(99.0));
2485        assert_eq!(numeric[5], Some(6.0));
2486
2487        // Validate text_slices: row1 has "X", others null
2488        let mut texts: Vec<Option<String>> = vec![None; 6];
2489        for (row_start, row_len, cols) in av.text_slices() {
2490            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
2491            for i in 0..row_len {
2492                let idx = row_start + i;
2493                texts[idx] = if a.is_null(i) {
2494                    None
2495                } else {
2496                    Some(a.value(i).to_string())
2497                };
2498            }
2499        }
2500        assert_eq!(texts[1].as_deref(), Some("X"));
2501        assert!(texts[0].is_none());
2502        assert!(texts[2].is_none());
2503        assert!(texts[3].is_none());
2504        assert!(texts[4].is_none());
2505        assert!(texts[5].is_none());
2506    }
2507
2508    #[test]
2509    fn effective_slices_overlay_precedence_booleans() {
2510        // Base booleans over 1 column; overlays include boolean and non-boolean types.
2511        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2512        for i in 0..6 {
2513            let v = if i % 2 == 0 {
2514                LiteralValue::Boolean(true)
2515            } else {
2516                LiteralValue::Boolean(false)
2517            };
2518            b.append_row(&[v]).unwrap();
2519        }
2520        let mut sheet = b.finish();
2521        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
2522        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2523        sheet.columns[0].chunks[c1]
2524            .overlay
2525            .set(o1, OverlayValue::Boolean(true));
2526        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
2527        sheet.columns[0].chunks[c2]
2528            .overlay
2529            .set(o2, OverlayValue::Text(Arc::from("T")));
2530
2531        let av = sheet.range_view(0, 0, 5, 0);
2532        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
2533        let mut bools: Vec<Option<bool>> = vec![None; 6];
2534        for (row_start, row_len, cols) in av.booleans_slices() {
2535            let a = &cols[0];
2536            for i in 0..row_len {
2537                let idx = row_start + i;
2538                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2539            }
2540        }
2541        assert_eq!(bools[0], Some(true));
2542        assert_eq!(bools[1], Some(true)); // overlay to true
2543        assert_eq!(bools[2], None); // overshadowed by text overlay
2544        // spot-check others remain base
2545        assert_eq!(bools[3], Some(false));
2546    }
2547
2548    #[test]
2549    fn effective_slices_overlay_precedence_errors() {
2550        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
2551        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2552        for i in 0..6 {
2553            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2554                .unwrap();
2555        }
2556        let mut sheet = b.finish();
2557        // Overlay error at row 4
2558        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2559        sheet.columns[0].chunks[c4]
2560            .overlay
2561            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
2562
2563        let av = sheet.range_view(0, 0, 5, 0);
2564        let mut errs: Vec<Option<u8>> = vec![None; 6];
2565        for (row_start, row_len, cols) in av.errors_slices() {
2566            let a = &cols[0];
2567            for i in 0..row_len {
2568                let idx = row_start + i;
2569                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2570            }
2571        }
2572        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
2573        assert!(errs[3].is_none());
2574    }
2575}