formualizer_eval/arrow_store/
mod.rs

1use crate::compute_prelude::{concat_arrays, zip_select};
2use arrow_array::Array;
3use arrow_array::new_null_array;
4use arrow_schema::DataType;
5use chrono::Timelike;
6use std::sync::Arc;
7
8use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
9use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
10use once_cell::sync::OnceCell;
11
12use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
13use std::collections::HashMap;
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane (ASCII lower), nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91    // Phase 0/1: separate computed overlay (formula/spill outputs)
92    pub computed_overlay: Overlay,
93}
94
95impl ColumnChunk {
96    #[inline]
97    pub fn len(&self) -> usize {
98        self.type_tag.len()
99    }
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104    #[inline]
105    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
106        if let Some(a) = &self.numbers {
107            return a.clone();
108        }
109        self.lazy_null_numbers
110            .get_or_init(|| {
111                let arr = new_null_array(&DataType::Float64, self.len());
112                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
113            })
114            .clone()
115    }
116    #[inline]
117    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
118        if let Some(a) = &self.booleans {
119            return a.clone();
120        }
121        self.lazy_null_booleans
122            .get_or_init(|| {
123                let arr = new_null_array(&DataType::Boolean, self.len());
124                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
125            })
126            .clone()
127    }
128    #[inline]
129    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
130        if let Some(a) = &self.errors {
131            return a.clone();
132        }
133        self.lazy_null_errors
134            .get_or_init(|| {
135                let arr = new_null_array(&DataType::UInt8, self.len());
136                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
137            })
138            .clone()
139    }
140    #[inline]
141    pub fn text_or_null(&self) -> ArrayRef {
142        if let Some(a) = &self.text {
143            return a.clone();
144        }
145        self.lazy_null_text
146            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
147            .clone()
148    }
149
150    /// Lowercased text lane (ASCII lower), with nulls preserved. Cached per chunk.
151    pub fn text_lower_or_null(&self) -> ArrayRef {
152        if let Some(a) = self.lowered_text.get() {
153            return a.clone();
154        }
155        // Lowercase when text present; else return null Utf8
156        let out: ArrayRef = if let Some(txt) = &self.text {
157            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
158            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
159            for i in 0..sa.len() {
160                if sa.is_null(i) {
161                    b.append_null();
162                } else {
163                    b.append_value(sa.value(i).to_ascii_lowercase());
164                }
165            }
166            let lowered = b.finish();
167            Arc::new(lowered)
168        } else {
169            new_null_array(&DataType::Utf8, self.len())
170        };
171        self.lowered_text.get_or_init(|| out.clone());
172        out
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct ArrowColumn {
178    pub chunks: Vec<ColumnChunk>,
179    pub index: u32,
180}
181
182#[derive(Debug, Clone)]
183pub struct ArrowSheet {
184    pub name: Arc<str>,
185    pub columns: Vec<ArrowColumn>,
186    pub nrows: u32,
187    pub chunk_starts: Vec<usize>,
188}
189
190#[derive(Debug, Default, Clone)]
191pub struct SheetStore {
192    pub sheets: Vec<ArrowSheet>,
193}
194
195impl SheetStore {
196    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
197        self.sheets.iter().find(|s| s.name.as_ref() == name)
198    }
199    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
200        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
201    }
202}
203
204/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
205pub struct IngestBuilder {
206    name: Arc<str>,
207    ncols: usize,
208    chunk_rows: usize,
209    date_system: crate::engine::DateSystem,
210
211    // Per-column active builders for current chunk
212    num_builders: Vec<Float64Builder>,
213    bool_builders: Vec<BooleanBuilder>,
214    text_builders: Vec<StringBuilder>,
215    err_builders: Vec<UInt8Builder>,
216    tag_builders: Vec<UInt8Builder>,
217
218    // Per-column per-lane non-null counters for current chunk
219    lane_counts: Vec<LaneCounts>,
220
221    // Accumulated chunks
222    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
223    row_in_chunk: usize,
224    total_rows: u32,
225}
226
227#[derive(Debug, Clone, Copy, Default)]
228struct LaneCounts {
229    n_num: usize,
230    n_bool: usize,
231    n_text: usize,
232    n_err: usize,
233}
234
235impl IngestBuilder {
236    pub fn new(
237        sheet_name: &str,
238        ncols: usize,
239        chunk_rows: usize,
240        date_system: crate::engine::DateSystem,
241    ) -> Self {
242        let mut chunks = Vec::with_capacity(ncols);
243        chunks.resize_with(ncols, Vec::new);
244        Self {
245            name: Arc::from(sheet_name.to_string()),
246            ncols,
247            chunk_rows: chunk_rows.max(1),
248            date_system,
249            num_builders: (0..ncols)
250                .map(|_| Float64Builder::with_capacity(chunk_rows))
251                .collect(),
252            bool_builders: (0..ncols)
253                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
254                .collect(),
255            text_builders: (0..ncols)
256                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
257                .collect(),
258            err_builders: (0..ncols)
259                .map(|_| UInt8Builder::with_capacity(chunk_rows))
260                .collect(),
261            tag_builders: (0..ncols)
262                .map(|_| UInt8Builder::with_capacity(chunk_rows))
263                .collect(),
264            lane_counts: vec![LaneCounts::default(); ncols],
265            chunks,
266            row_in_chunk: 0,
267            total_rows: 0,
268        }
269    }
270
271    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
272    /// Text borrows are copied into the internal StringBuilder.
273    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
274        assert_eq!(row.len(), self.ncols, "row width mismatch");
275        for (c, cell) in row.iter().enumerate() {
276            match cell {
277                CellIngest::Empty => {
278                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
279                    self.num_builders[c].append_null();
280                    self.bool_builders[c].append_null();
281                    self.text_builders[c].append_null();
282                    self.err_builders[c].append_null();
283                }
284                CellIngest::Number(n) => {
285                    self.tag_builders[c].append_value(TypeTag::Number as u8);
286                    self.num_builders[c].append_value(*n);
287                    self.lane_counts[c].n_num += 1;
288                    self.bool_builders[c].append_null();
289                    self.text_builders[c].append_null();
290                    self.err_builders[c].append_null();
291                }
292                CellIngest::Boolean(b) => {
293                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
294                    self.num_builders[c].append_null();
295                    self.bool_builders[c].append_value(*b);
296                    self.lane_counts[c].n_bool += 1;
297                    self.text_builders[c].append_null();
298                    self.err_builders[c].append_null();
299                }
300                CellIngest::Text(s) => {
301                    self.tag_builders[c].append_value(TypeTag::Text as u8);
302                    self.num_builders[c].append_null();
303                    self.bool_builders[c].append_null();
304                    self.text_builders[c].append_value(s);
305                    self.lane_counts[c].n_text += 1;
306                    self.err_builders[c].append_null();
307                }
308                CellIngest::ErrorCode(code) => {
309                    self.tag_builders[c].append_value(TypeTag::Error as u8);
310                    self.num_builders[c].append_null();
311                    self.bool_builders[c].append_null();
312                    self.text_builders[c].append_null();
313                    self.err_builders[c].append_value(*code);
314                    self.lane_counts[c].n_err += 1;
315                }
316                CellIngest::DateSerial(serial) => {
317                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
318                    self.num_builders[c].append_value(*serial);
319                    self.lane_counts[c].n_num += 1;
320                    self.bool_builders[c].append_null();
321                    self.text_builders[c].append_null();
322                    self.err_builders[c].append_null();
323                }
324                CellIngest::Pending => {
325                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
326                    self.num_builders[c].append_null();
327                    self.bool_builders[c].append_null();
328                    self.text_builders[c].append_null();
329                    self.err_builders[c].append_null();
330                }
331            }
332        }
333        self.row_in_chunk += 1;
334        self.total_rows += 1;
335        if self.row_in_chunk >= self.chunk_rows {
336            self.finish_chunk();
337        }
338        Ok(())
339    }
340
341    /// Streaming row append from an iterator of typed cell tokens.
342    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
343    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
344    where
345        I: ExactSizeIterator<Item = CellIngest<'a>>,
346    {
347        assert_eq!(iter.len(), self.ncols, "row width mismatch");
348        for (c, cell) in iter.enumerate() {
349            match cell {
350                CellIngest::Empty => {
351                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
352                    self.num_builders[c].append_null();
353                    self.bool_builders[c].append_null();
354                    self.text_builders[c].append_null();
355                    self.err_builders[c].append_null();
356                }
357                CellIngest::Number(n) => {
358                    self.tag_builders[c].append_value(TypeTag::Number as u8);
359                    self.num_builders[c].append_value(n);
360                    self.lane_counts[c].n_num += 1;
361                    self.bool_builders[c].append_null();
362                    self.text_builders[c].append_null();
363                    self.err_builders[c].append_null();
364                }
365                CellIngest::Boolean(b) => {
366                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
367                    self.num_builders[c].append_null();
368                    self.bool_builders[c].append_value(b);
369                    self.lane_counts[c].n_bool += 1;
370                    self.text_builders[c].append_null();
371                    self.err_builders[c].append_null();
372                }
373                CellIngest::Text(s) => {
374                    self.tag_builders[c].append_value(TypeTag::Text as u8);
375                    self.num_builders[c].append_null();
376                    self.bool_builders[c].append_null();
377                    self.text_builders[c].append_value(s);
378                    self.lane_counts[c].n_text += 1;
379                    self.err_builders[c].append_null();
380                }
381                CellIngest::ErrorCode(code) => {
382                    self.tag_builders[c].append_value(TypeTag::Error as u8);
383                    self.num_builders[c].append_null();
384                    self.bool_builders[c].append_null();
385                    self.text_builders[c].append_null();
386                    self.err_builders[c].append_value(code);
387                    self.lane_counts[c].n_err += 1;
388                }
389                CellIngest::DateSerial(serial) => {
390                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
391                    self.num_builders[c].append_value(serial);
392                    self.lane_counts[c].n_num += 1;
393                    self.bool_builders[c].append_null();
394                    self.text_builders[c].append_null();
395                    self.err_builders[c].append_null();
396                }
397                CellIngest::Pending => {
398                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
399                    self.num_builders[c].append_null();
400                    self.bool_builders[c].append_null();
401                    self.text_builders[c].append_null();
402                    self.err_builders[c].append_null();
403                }
404            }
405        }
406        self.row_in_chunk += 1;
407        self.total_rows += 1;
408        if self.row_in_chunk >= self.chunk_rows {
409            self.finish_chunk();
410        }
411        Ok(())
412    }
413
414    /// Append a single row of values. Length must match `ncols`.
415    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
416        assert_eq!(row.len(), self.ncols, "row width mismatch");
417
418        for (c, v) in row.iter().enumerate() {
419            let tag = TypeTag::from_value(v) as u8;
420            self.tag_builders[c].append_value(tag);
421
422            match v {
423                LiteralValue::Empty => {
424                    self.num_builders[c].append_null();
425                    self.bool_builders[c].append_null();
426                    self.text_builders[c].append_null();
427                    self.err_builders[c].append_null();
428                }
429                LiteralValue::Int(i) => {
430                    self.num_builders[c].append_value(*i as f64);
431                    self.lane_counts[c].n_num += 1;
432                    self.bool_builders[c].append_null();
433                    self.text_builders[c].append_null();
434                    self.err_builders[c].append_null();
435                }
436                LiteralValue::Number(n) => {
437                    self.num_builders[c].append_value(*n);
438                    self.lane_counts[c].n_num += 1;
439                    self.bool_builders[c].append_null();
440                    self.text_builders[c].append_null();
441                    self.err_builders[c].append_null();
442                }
443                LiteralValue::Boolean(b) => {
444                    self.num_builders[c].append_null();
445                    self.bool_builders[c].append_value(*b);
446                    self.lane_counts[c].n_bool += 1;
447                    self.text_builders[c].append_null();
448                    self.err_builders[c].append_null();
449                }
450                LiteralValue::Text(s) => {
451                    self.num_builders[c].append_null();
452                    self.bool_builders[c].append_null();
453                    self.text_builders[c].append_value(s);
454                    self.lane_counts[c].n_text += 1;
455                    self.err_builders[c].append_null();
456                }
457                LiteralValue::Error(e) => {
458                    self.num_builders[c].append_null();
459                    self.bool_builders[c].append_null();
460                    self.text_builders[c].append_null();
461                    self.err_builders[c].append_value(map_error_code(e.kind));
462                    self.lane_counts[c].n_err += 1;
463                }
464                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
465                LiteralValue::Date(d) => {
466                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
467                    let serial =
468                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
469                    self.num_builders[c].append_value(serial);
470                    self.lane_counts[c].n_num += 1;
471                    self.bool_builders[c].append_null();
472                    self.text_builders[c].append_null();
473                    self.err_builders[c].append_null();
474                }
475                LiteralValue::DateTime(dt) => {
476                    let serial =
477                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
478                    self.num_builders[c].append_value(serial);
479                    self.lane_counts[c].n_num += 1;
480                    self.bool_builders[c].append_null();
481                    self.text_builders[c].append_null();
482                    self.err_builders[c].append_null();
483                }
484                LiteralValue::Time(t) => {
485                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
486                    self.num_builders[c].append_value(serial);
487                    self.lane_counts[c].n_num += 1;
488                    self.bool_builders[c].append_null();
489                    self.text_builders[c].append_null();
490                    self.err_builders[c].append_null();
491                }
492                LiteralValue::Duration(dur) => {
493                    let serial = dur.num_seconds() as f64 / 86_400.0;
494                    self.num_builders[c].append_value(serial);
495                    self.lane_counts[c].n_num += 1;
496                    self.bool_builders[c].append_null();
497                    self.text_builders[c].append_null();
498                    self.err_builders[c].append_null();
499                }
500                LiteralValue::Array(_) => {
501                    // Not allowed as a stored scalar; mark as error kind VALUE
502                    self.num_builders[c].append_null();
503                    self.bool_builders[c].append_null();
504                    self.text_builders[c].append_null();
505                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
506                    self.lane_counts[c].n_err += 1;
507                }
508                LiteralValue::Pending => {
509                    // Pending: tag only; all lanes remain null (no error)
510                    self.num_builders[c].append_null();
511                    self.bool_builders[c].append_null();
512                    self.text_builders[c].append_null();
513                    self.err_builders[c].append_null();
514                }
515            }
516        }
517
518        self.row_in_chunk += 1;
519        self.total_rows += 1;
520
521        if self.row_in_chunk >= self.chunk_rows {
522            self.finish_chunk();
523        }
524
525        Ok(())
526    }
527
528    fn finish_chunk(&mut self) {
529        if self.row_in_chunk == 0 {
530            return;
531        }
532        for c in 0..self.ncols {
533            let len = self.row_in_chunk;
534            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
535                None
536            } else {
537                Some(Arc::new(self.num_builders[c].finish()))
538            };
539            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
540                None
541            } else {
542                Some(Arc::new(self.bool_builders[c].finish()))
543            };
544            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
545                None
546            } else {
547                Some(Arc::new(self.text_builders[c].finish()))
548            };
549            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
550                None
551            } else {
552                Some(Arc::new(self.err_builders[c].finish()))
553            };
554            let tags: UInt8Array = self.tag_builders[c].finish();
555
556            let chunk = ColumnChunk {
557                numbers: numbers_arc,
558                booleans: booleans_arc,
559                text: text_ref,
560                errors: errors_arc,
561                type_tag: Arc::new(tags),
562                formula_id: None,
563                meta: ColumnChunkMeta {
564                    len,
565                    non_null_num: self.lane_counts[c].n_num,
566                    non_null_bool: self.lane_counts[c].n_bool,
567                    non_null_text: self.lane_counts[c].n_text,
568                    non_null_err: self.lane_counts[c].n_err,
569                },
570                lazy_null_numbers: OnceCell::new(),
571                lazy_null_booleans: OnceCell::new(),
572                lazy_null_text: OnceCell::new(),
573                lazy_null_errors: OnceCell::new(),
574                lowered_text: OnceCell::new(),
575                overlay: Overlay::new(),
576                computed_overlay: Overlay::new(),
577            };
578            self.chunks[c].push(chunk);
579
580            // re-init builders for next chunk
581            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
582            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
583            self.text_builders[c] =
584                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
585            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
586            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
587            self.lane_counts[c] = LaneCounts::default();
588        }
589        self.row_in_chunk = 0;
590    }
591
592    pub fn finish(mut self) -> ArrowSheet {
593        // flush partial chunk
594        if self.row_in_chunk > 0 {
595            self.finish_chunk();
596        }
597
598        let mut columns = Vec::with_capacity(self.ncols);
599        for (idx, chunks) in self.chunks.into_iter().enumerate() {
600            columns.push(ArrowColumn {
601                chunks,
602                index: idx as u32,
603            });
604        }
605        // Precompute chunk starts from first column and enforce alignment across columns
606        let mut chunk_starts: Vec<usize> = Vec::new();
607        if let Some(col0) = columns.first() {
608            let chunks_len0 = col0.chunks.len();
609            for (ci, col) in columns.iter().enumerate() {
610                if col.chunks.len() != chunks_len0 {
611                    panic!(
612                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
613                        ci,
614                        col.chunks.len(),
615                        chunks_len0
616                    );
617                }
618            }
619            let mut cur = 0usize;
620            for i in 0..chunks_len0 {
621                let len_i = col0.chunks[i].type_tag.len();
622                for (ci, col) in columns.iter().enumerate() {
623                    let got = col.chunks[i].type_tag.len();
624                    if got != len_i {
625                        panic!(
626                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
627                        );
628                    }
629                }
630                chunk_starts.push(cur);
631                cur += len_i;
632            }
633        }
634        ArrowSheet {
635            name: self.name,
636            columns,
637            nrows: self.total_rows,
638            chunk_starts,
639        }
640    }
641}
642
643pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
644    match kind {
645        ExcelErrorKind::Null => 1,
646        ExcelErrorKind::Ref => 2,
647        ExcelErrorKind::Name => 3,
648        ExcelErrorKind::Value => 4,
649        ExcelErrorKind::Div => 5,
650        ExcelErrorKind::Na => 6,
651        ExcelErrorKind::Num => 7,
652        ExcelErrorKind::Error => 8,
653        ExcelErrorKind::NImpl => 9,
654        ExcelErrorKind::Spill => 10,
655        ExcelErrorKind::Calc => 11,
656        ExcelErrorKind::Circ => 12,
657        ExcelErrorKind::Cancelled => 13,
658    }
659}
660
661pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
662    match code {
663        1 => ExcelErrorKind::Null,
664        2 => ExcelErrorKind::Ref,
665        3 => ExcelErrorKind::Name,
666        4 => ExcelErrorKind::Value,
667        5 => ExcelErrorKind::Div,
668        6 => ExcelErrorKind::Na,
669        7 => ExcelErrorKind::Num,
670        8 => ExcelErrorKind::Error,
671        9 => ExcelErrorKind::NImpl,
672        10 => ExcelErrorKind::Spill,
673        11 => ExcelErrorKind::Calc,
674        12 => ExcelErrorKind::Circ,
675        13 => ExcelErrorKind::Cancelled,
676        _ => ExcelErrorKind::Error,
677    }
678}
679
680// ─────────────────────────── Overlay (Phase C) ────────────────────────────
681
682/// Zero-allocation cell token for ingestion.
683pub enum CellIngest<'a> {
684    Empty,
685    Number(f64),
686    Boolean(bool),
687    Text(&'a str),
688    ErrorCode(u8),
689    DateSerial(f64),
690    Pending,
691}
692
693#[derive(Debug, Clone)]
694pub enum OverlayValue {
695    Empty,
696    Number(f64),
697    Boolean(bool),
698    Text(Arc<str>),
699    Error(u8),
700    Pending,
701}
702
703#[derive(Debug, Default, Clone)]
704pub struct Overlay {
705    map: HashMap<usize, OverlayValue>,
706}
707
708impl Overlay {
709    pub fn new() -> Self {
710        Self {
711            map: HashMap::new(),
712        }
713    }
714    #[inline]
715    pub fn get(&self, off: usize) -> Option<&OverlayValue> {
716        self.map.get(&off)
717    }
718    #[inline]
719    pub fn set(&mut self, off: usize, v: OverlayValue) {
720        self.map.insert(off, v);
721    }
722    #[inline]
723    pub fn clear(&mut self) {
724        self.map.clear();
725    }
726    #[inline]
727    pub fn len(&self) -> usize {
728        self.map.len()
729    }
730    #[inline]
731    pub fn is_empty(&self) -> bool {
732        self.map.is_empty()
733    }
734    #[inline]
735    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
736        self.map.keys().any(|k| range.contains(k))
737    }
738}
739
740/// A lightweight view over a rectangular range in an `ArrowSheet`.
741/// Coordinates are 0-based and inclusive.
742pub struct ArrowRangeView<'a> {
743    sheet: &'a ArrowSheet,
744    sr: usize,
745    sc: usize,
746    er: usize,
747    ec: usize,
748    rows: usize,
749    cols: usize,
750    chunk_starts: &'a [usize],
751}
752
753impl ArrowSheet {
754    /// Return a summary of each column's chunk counts, total rows, and lane presence.
755    pub fn shape(&self) -> Vec<ColumnShape> {
756        self.columns
757            .iter()
758            .map(|c| {
759                let chunks = c.chunks.len();
760                let rows = self.nrows as usize;
761                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
762                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
763                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
764                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
765                ColumnShape {
766                    index: c.index,
767                    chunks,
768                    rows,
769                    has_num,
770                    has_bool,
771                    has_text,
772                    has_err,
773                }
774            })
775            .collect()
776    }
777    pub fn range_view(&self, sr: usize, sc: usize, er: usize, ec: usize) -> ArrowRangeView<'_> {
778        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
779        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
780        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
781        ArrowRangeView {
782            sheet: self,
783            sr,
784            sc,
785            er,
786            ec,
787            rows,
788            cols,
789            chunk_starts: &self.chunk_starts,
790        }
791    }
792
793    /// Ensure capacity to address at least target_rows rows by appending empty chunks.
794    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
795        if target_rows as u32 <= self.nrows {
796            return;
797        }
798        // Determine chunk size from last chunk, fallback to 32k
799        let mut chunk_size = 32 * 1024;
800        if let Some(c0) = self.columns.first()
801            && let Some(last) = c0.chunks.last()
802        {
803            chunk_size = last.type_tag.len().max(1);
804        }
805        let mut cur_rows = self.nrows as usize;
806        while cur_rows < target_rows {
807            let len = (target_rows - cur_rows).min(chunk_size);
808            // Append chunk_starts entry
809            self.chunk_starts.push(cur_rows);
810            for col in &mut self.columns {
811                let tags = UInt8Array::from(vec![TypeTag::Empty as u8; len]);
812                col.chunks.push(ColumnChunk {
813                    numbers: None,
814                    booleans: None,
815                    text: None,
816                    errors: None,
817                    type_tag: Arc::new(tags),
818                    formula_id: None,
819                    meta: ColumnChunkMeta {
820                        len,
821                        non_null_num: 0,
822                        non_null_bool: 0,
823                        non_null_text: 0,
824                        non_null_err: 0,
825                    },
826                    lazy_null_numbers: OnceCell::new(),
827                    lazy_null_booleans: OnceCell::new(),
828                    lazy_null_text: OnceCell::new(),
829                    lazy_null_errors: OnceCell::new(),
830                    lowered_text: OnceCell::new(),
831                    overlay: Overlay::new(),
832                    computed_overlay: Overlay::new(),
833                });
834            }
835            cur_rows += len;
836            self.nrows = cur_rows as u32;
837        }
838    }
839
840    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
841    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
842        if abs_row >= self.nrows as usize {
843            return None;
844        }
845        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
846            Ok(i) => i,
847            Err(0) => 0,
848            Err(i) => i - 1,
849        };
850        let start = self.chunk_starts[ch_idx];
851        Some((ch_idx, abs_row - start))
852    }
853
854    fn recompute_chunk_starts(&mut self) {
855        self.chunk_starts.clear();
856        if let Some(col0) = self.columns.first() {
857            let mut cur = 0usize;
858            for ch in &col0.chunks {
859                self.chunk_starts.push(cur);
860                cur += ch.type_tag.len();
861            }
862        }
863    }
864
865    fn make_empty_chunk(len: usize) -> ColumnChunk {
866        ColumnChunk {
867            numbers: None,
868            booleans: None,
869            text: None,
870            errors: None,
871            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
872            formula_id: None,
873            meta: ColumnChunkMeta {
874                len,
875                non_null_num: 0,
876                non_null_bool: 0,
877                non_null_text: 0,
878                non_null_err: 0,
879            },
880            lazy_null_numbers: OnceCell::new(),
881            lazy_null_booleans: OnceCell::new(),
882            lazy_null_text: OnceCell::new(),
883            lazy_null_errors: OnceCell::new(),
884            lowered_text: OnceCell::new(),
885            overlay: Overlay::new(),
886            computed_overlay: Overlay::new(),
887        }
888    }
889
890    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
891        // Slice type tags
892        use arrow_array::Array;
893        let type_tag: Arc<UInt8Array> = Arc::new(
894            Array::slice(ch.type_tag.as_ref(), off, len)
895                .as_any()
896                .downcast_ref::<UInt8Array>()
897                .unwrap()
898                .clone(),
899        );
900        // Slice numbers if present and keep only if any non-null
901        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
902            let sl = Array::slice(a.as_ref(), off, len);
903            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
904            let nn = len.saturating_sub(fa.null_count());
905            if nn == 0 { None } else { Some(Arc::new(fa)) }
906        });
907        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
908            let sl = Array::slice(a.as_ref(), off, len);
909            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
910            let nn = len.saturating_sub(ba.null_count());
911            if nn == 0 { None } else { Some(Arc::new(ba)) }
912        });
913        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
914            let sl = Array::slice(a.as_ref(), off, len);
915            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
916            let nn = len.saturating_sub(sa.null_count());
917            if nn == 0 {
918                None
919            } else {
920                Some(Arc::new(sa) as ArrayRef)
921            }
922        });
923        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
924            let sl = Array::slice(a.as_ref(), off, len);
925            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
926            let nn = len.saturating_sub(ea.null_count());
927            if nn == 0 { None } else { Some(Arc::new(ea)) }
928        });
929        // Split overlays for this slice
930        let mut overlay = Overlay::new();
931        for (k, v) in ch.overlay.map.iter() {
932            if *k >= off && *k < off + len {
933                overlay.set(*k - off, v.clone());
934            }
935        }
936        let mut computed_overlay = Overlay::new();
937        for (k, v) in ch.computed_overlay.map.iter() {
938            if *k >= off && *k < off + len {
939                computed_overlay.set(*k - off, v.clone());
940            }
941        }
942        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
943        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
944        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
945        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
946        ColumnChunk {
947            numbers: numbers.clone(),
948            booleans: booleans.clone(),
949            text: text.clone(),
950            errors: errors.clone(),
951            type_tag,
952            formula_id: None,
953            meta: ColumnChunkMeta {
954                len,
955                non_null_num,
956                non_null_bool,
957                non_null_text,
958                non_null_err,
959            },
960            lazy_null_numbers: OnceCell::new(),
961            lazy_null_booleans: OnceCell::new(),
962            lazy_null_text: OnceCell::new(),
963            lazy_null_errors: OnceCell::new(),
964            lowered_text: OnceCell::new(),
965            overlay,
966            computed_overlay,
967        }
968    }
969
970    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
971    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
972    pub fn maybe_compact_chunk(
973        &mut self,
974        col_idx: usize,
975        ch_idx: usize,
976        abs_threshold: usize,
977        frac_den: usize,
978    ) -> bool {
979        if col_idx >= self.columns.len() || self.columns[col_idx].chunks.len() <= ch_idx {
980            return false;
981        }
982        let ch = &self.columns[col_idx].chunks[ch_idx];
983        let len = ch.type_tag.len();
984        if len == 0 {
985            return false;
986        }
987        let ov_len = ch.overlay.len();
988        let den = if frac_den.max(1) == 0 {
989            1
990        } else {
991            frac_den.max(1)
992        };
993        let trig = ov_len > (len / den) || ov_len > abs_threshold;
994        if !trig {
995            return false;
996        }
997        // Rebuild: merge base lanes with overlays row-by-row
998        let mut tag_b = UInt8Builder::with_capacity(len);
999        let mut nb = Float64Builder::with_capacity(len);
1000        let mut bb = BooleanBuilder::with_capacity(len);
1001        let mut sb = StringBuilder::with_capacity(len, len * 8);
1002        let mut eb = UInt8Builder::with_capacity(len);
1003        let mut non_num = 0usize;
1004        let mut non_bool = 0usize;
1005        let mut non_text = 0usize;
1006        let mut non_err = 0usize;
1007
1008        let ch_ref = &self.columns[col_idx].chunks[ch_idx];
1009        for i in 0..len {
1010            // If overlay present, use it. Otherwise, use base tag+lane
1011            if let Some(ov) = ch_ref.overlay.get(i) {
1012                match ov {
1013                    OverlayValue::Empty => {
1014                        tag_b.append_value(TypeTag::Empty as u8);
1015                        nb.append_null();
1016                        bb.append_null();
1017                        sb.append_null();
1018                        eb.append_null();
1019                    }
1020                    OverlayValue::Number(n) => {
1021                        tag_b.append_value(TypeTag::Number as u8);
1022                        nb.append_value(*n);
1023                        non_num += 1;
1024                        bb.append_null();
1025                        sb.append_null();
1026                        eb.append_null();
1027                    }
1028                    OverlayValue::Boolean(b) => {
1029                        tag_b.append_value(TypeTag::Boolean as u8);
1030                        nb.append_null();
1031                        bb.append_value(*b);
1032                        non_bool += 1;
1033                        sb.append_null();
1034                        eb.append_null();
1035                    }
1036                    OverlayValue::Text(s) => {
1037                        tag_b.append_value(TypeTag::Text as u8);
1038                        nb.append_null();
1039                        bb.append_null();
1040                        sb.append_value(s);
1041                        non_text += 1;
1042                        eb.append_null();
1043                    }
1044                    OverlayValue::Error(code) => {
1045                        tag_b.append_value(TypeTag::Error as u8);
1046                        nb.append_null();
1047                        bb.append_null();
1048                        sb.append_null();
1049                        eb.append_value(*code);
1050                        non_err += 1;
1051                    }
1052                    OverlayValue::Pending => {
1053                        tag_b.append_value(TypeTag::Pending as u8);
1054                        nb.append_null();
1055                        bb.append_null();
1056                        sb.append_null();
1057                        eb.append_null();
1058                    }
1059                }
1060            } else {
1061                let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
1062                match tag {
1063                    TypeTag::Empty => {
1064                        tag_b.append_value(TypeTag::Empty as u8);
1065                        nb.append_null();
1066                        bb.append_null();
1067                        sb.append_null();
1068                        eb.append_null();
1069                    }
1070                    TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1071                        tag_b.append_value(TypeTag::Number as u8);
1072                        if let Some(a) = &ch_ref.numbers {
1073                            let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
1074                            if fa.is_null(i) {
1075                                nb.append_null();
1076                            } else {
1077                                nb.append_value(fa.value(i));
1078                                non_num += 1;
1079                            }
1080                        } else {
1081                            nb.append_null();
1082                        }
1083                        bb.append_null();
1084                        sb.append_null();
1085                        eb.append_null();
1086                    }
1087                    TypeTag::Boolean => {
1088                        tag_b.append_value(TypeTag::Boolean as u8);
1089                        nb.append_null();
1090                        if let Some(a) = &ch_ref.booleans {
1091                            let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
1092                            if ba.is_null(i) {
1093                                bb.append_null();
1094                            } else {
1095                                bb.append_value(ba.value(i));
1096                                non_bool += 1;
1097                            }
1098                        } else {
1099                            bb.append_null();
1100                        }
1101                        sb.append_null();
1102                        eb.append_null();
1103                    }
1104                    TypeTag::Text => {
1105                        tag_b.append_value(TypeTag::Text as u8);
1106                        nb.append_null();
1107                        bb.append_null();
1108                        if let Some(a) = &ch_ref.text {
1109                            let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
1110                            if sa.is_null(i) {
1111                                sb.append_null();
1112                            } else {
1113                                sb.append_value(sa.value(i));
1114                                non_text += 1;
1115                            }
1116                        } else {
1117                            sb.append_null();
1118                        }
1119                        eb.append_null();
1120                    }
1121                    TypeTag::Error => {
1122                        tag_b.append_value(TypeTag::Error as u8);
1123                        nb.append_null();
1124                        bb.append_null();
1125                        sb.append_null();
1126                        if let Some(a) = &ch_ref.errors {
1127                            let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
1128                            if ea.is_null(i) {
1129                                eb.append_null();
1130                            } else {
1131                                eb.append_value(ea.value(i));
1132                                non_err += 1;
1133                            }
1134                        } else {
1135                            eb.append_null();
1136                        }
1137                    }
1138                    TypeTag::Pending => {
1139                        tag_b.append_value(TypeTag::Pending as u8);
1140                        nb.append_null();
1141                        bb.append_null();
1142                        sb.append_null();
1143                        eb.append_null();
1144                    }
1145                }
1146            }
1147        }
1148        let tags = Arc::new(tag_b.finish());
1149        let numbers = {
1150            let a = nb.finish();
1151            if non_num == 0 {
1152                None
1153            } else {
1154                Some(Arc::new(a))
1155            }
1156        };
1157        let booleans = {
1158            let a = bb.finish();
1159            if non_bool == 0 {
1160                None
1161            } else {
1162                Some(Arc::new(a))
1163            }
1164        };
1165        let text = {
1166            let a = sb.finish();
1167            if non_text == 0 {
1168                None
1169            } else {
1170                Some(Arc::new(a) as ArrayRef)
1171            }
1172        };
1173        let errors = {
1174            let a = eb.finish();
1175            if non_err == 0 {
1176                None
1177            } else {
1178                Some(Arc::new(a))
1179            }
1180        };
1181        // Swap in rebuilt chunk and clear overlay
1182        let ch_mut = &mut self.columns[col_idx].chunks[ch_idx];
1183        ch_mut.type_tag = tags;
1184        ch_mut.numbers = numbers;
1185        ch_mut.booleans = booleans;
1186        ch_mut.text = text;
1187        ch_mut.errors = errors;
1188        ch_mut.overlay.clear();
1189        ch_mut.meta.len = len;
1190        ch_mut.meta.non_null_num = non_num;
1191        ch_mut.meta.non_null_bool = non_bool;
1192        ch_mut.meta.non_null_text = non_text;
1193        ch_mut.meta.non_null_err = non_err;
1194        true
1195    }
1196
1197    /// Insert `count` rows before absolute 0-based row `before`.
1198    pub fn insert_rows(&mut self, before: usize, count: usize) {
1199        if count == 0 {
1200            return;
1201        }
1202        if self.columns.is_empty() {
1203            // No columns: just extend nrows
1204            self.nrows = self.nrows.saturating_add(count as u32);
1205            return;
1206        }
1207        let total_rows = self.nrows as usize;
1208        let insert_at = before.min(total_rows);
1209        // Locate split chunk and offset
1210        let (ch_idx, in_off) = if insert_at == total_rows && !self.chunk_starts.is_empty() {
1211            // Append after last row: split after last chunk
1212            let last_idx = self.chunk_starts.len() - 1;
1213            let last_len = self.columns[0].chunks[last_idx].type_tag.len();
1214            (last_idx, last_len)
1215        } else {
1216            self.chunk_of_row(insert_at).unwrap_or((0, 0))
1217        };
1218        // Rebuild chunks for each column
1219        for col in &mut self.columns {
1220            let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 1);
1221            for i in 0..col.chunks.len() {
1222                if i != ch_idx {
1223                    new_chunks.push(col.chunks[i].clone());
1224                } else {
1225                    let orig = &col.chunks[i];
1226                    let len = orig.type_tag.len();
1227                    if in_off > 0 {
1228                        new_chunks.push(Self::slice_chunk(orig, 0, in_off));
1229                    }
1230                    new_chunks.push(Self::make_empty_chunk(count));
1231                    if in_off < len {
1232                        new_chunks.push(Self::slice_chunk(orig, in_off, len - in_off));
1233                    }
1234                }
1235            }
1236            col.chunks = new_chunks;
1237        }
1238        self.nrows = (total_rows + count) as u32;
1239        self.recompute_chunk_starts();
1240    }
1241
1242    /// Delete `count` rows starting from absolute 0-based row `start`.
1243    pub fn delete_rows(&mut self, start: usize, count: usize) {
1244        if count == 0 || self.columns.is_empty() || self.nrows == 0 {
1245            return;
1246        }
1247        let total_rows = self.nrows as usize;
1248        if start >= total_rows {
1249            return;
1250        }
1251        let end = (start + count).min(total_rows);
1252        // For each column rebuild chunk list by slicing out deleted window
1253        for col in &mut self.columns {
1254            let mut new_chunks: Vec<ColumnChunk> = Vec::new();
1255            let mut cur_start = 0usize;
1256            for ch in &col.chunks {
1257                let len = ch.type_tag.len();
1258                let ch_end = cur_start + len;
1259                // No overlap
1260                if ch_end <= start || cur_start >= end {
1261                    new_chunks.push(ch.clone());
1262                } else {
1263                    // Overlap exists
1264                    let del_start = start.max(cur_start);
1265                    let del_end = end.min(ch_end);
1266                    let left_len = del_start.saturating_sub(cur_start);
1267                    let right_len = ch_end.saturating_sub(del_end);
1268                    if left_len > 0 {
1269                        new_chunks.push(Self::slice_chunk(ch, 0, left_len));
1270                    }
1271                    if right_len > 0 {
1272                        let off = len - right_len;
1273                        new_chunks.push(Self::slice_chunk(ch, off, right_len));
1274                    }
1275                }
1276                cur_start = ch_end;
1277            }
1278            col.chunks = new_chunks;
1279        }
1280        self.nrows = (total_rows - (end - start)) as u32;
1281        self.recompute_chunk_starts();
1282    }
1283
1284    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
1285    pub fn insert_columns(&mut self, before: usize, count: usize) {
1286        if count == 0 {
1287            return;
1288        }
1289        // Determine chunk schema from first column if present
1290        let empty_col = |lens: &[usize]| -> ArrowColumn {
1291            let mut chunks = Vec::with_capacity(lens.len());
1292            for &l in lens {
1293                chunks.push(Self::make_empty_chunk(l));
1294            }
1295            ArrowColumn { chunks, index: 0 }
1296        };
1297        let lens: Vec<usize> = if let Some(col0) = self.columns.first() {
1298            col0.chunks.iter().map(|c| c.type_tag.len()).collect()
1299        } else {
1300            // No columns: single chunk matching nrows if any
1301            if self.nrows > 0 {
1302                vec![self.nrows as usize]
1303            } else {
1304                Vec::new()
1305            }
1306        };
1307        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
1308        let before_idx = before.min(self.columns.len());
1309        for (i, col) in self.columns.iter_mut().enumerate() {
1310            if i == before_idx {
1311                for _ in 0..count {
1312                    cols_new.push(empty_col(&lens));
1313                }
1314            }
1315            cols_new.push(col.clone());
1316        }
1317        if before_idx == self.columns.len() {
1318            for _ in 0..count {
1319                cols_new.push(empty_col(&lens));
1320            }
1321        }
1322        // Fix column indices
1323        for (idx, col) in cols_new.iter_mut().enumerate() {
1324            col.index = idx as u32;
1325        }
1326        self.columns = cols_new;
1327        // chunk_starts unchanged; lens were matched
1328    }
1329
1330    /// Delete `count` columns starting at absolute 0-based column `start`.
1331    pub fn delete_columns(&mut self, start: usize, count: usize) {
1332        if count == 0 || self.columns.is_empty() {
1333            return;
1334        }
1335        let end = (start + count).min(self.columns.len());
1336        if start >= end {
1337            return;
1338        }
1339        self.columns.drain(start..end);
1340        for (idx, col) in self.columns.iter_mut().enumerate() {
1341            col.index = idx as u32;
1342        }
1343    }
1344}
1345
1346#[derive(Debug, Clone, Copy)]
1347pub struct ColumnShape {
1348    pub index: u32,
1349    pub chunks: usize,
1350    pub rows: usize,
1351    pub has_num: bool,
1352    pub has_bool: bool,
1353    pub has_text: bool,
1354    pub has_err: bool,
1355}
1356
1357impl<'a> ArrowRangeView<'a> {
1358    /// Absolute 0-based start row of this view.
1359    pub fn start_row(&self) -> usize {
1360        self.sr
1361    }
1362    /// Absolute 0-based end row of this view (inclusive).
1363    pub fn end_row(&self) -> usize {
1364        self.er
1365    }
1366    /// Absolute 0-based start column of this view.
1367    pub fn start_col(&self) -> usize {
1368        self.sc
1369    }
1370    /// Absolute 0-based end column of this view (inclusive).
1371    pub fn end_col(&self) -> usize {
1372        self.ec
1373    }
1374    /// Owning sheet name.
1375    pub fn sheet_name(&self) -> &str {
1376        &self.sheet.name
1377    }
1378    #[inline]
1379    pub fn dims(&self) -> (usize, usize) {
1380        (self.rows, self.cols)
1381    }
1382
1383    /// Returns a single cell value relative to this view (row/col 0-based).
1384    /// OOB returns Empty. Phase A: Date/Time/Duration come back as Number
1385    /// with the corresponding TypeTag preserved for higher layers.
1386    pub fn get_cell(&self, row: usize, col: usize) -> LiteralValue {
1387        if row >= self.rows || col >= self.cols {
1388            return LiteralValue::Empty;
1389        }
1390        let abs_row = self.sr + row;
1391        let abs_col = self.sc + col;
1392        let sheet_rows = self.sheet.nrows as usize;
1393        if abs_row >= sheet_rows {
1394            return LiteralValue::Empty;
1395        }
1396        if abs_col >= self.sheet.columns.len() {
1397            return LiteralValue::Empty;
1398        }
1399        let col_ref = &self.sheet.columns[abs_col];
1400        // Locate chunk by binary searching start offsets
1401        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
1402            Ok(i) => i,
1403            Err(0) => 0,
1404            Err(i) => i - 1,
1405        };
1406        if ch_idx >= col_ref.chunks.len() {
1407            return LiteralValue::Empty;
1408        }
1409        let ch = &col_ref.chunks[ch_idx];
1410        let row_start = self.chunk_starts[ch_idx];
1411        let in_off = abs_row - row_start;
1412        // Overlay takes precedence: user edits over computed over base.
1413        if let Some(ov) = ch
1414            .overlay
1415            .get(in_off)
1416            .or_else(|| ch.computed_overlay.get(in_off))
1417        {
1418            return match ov {
1419                OverlayValue::Empty => LiteralValue::Empty,
1420                OverlayValue::Number(n) => LiteralValue::Number(*n),
1421                OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
1422                OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
1423                OverlayValue::Error(code) => {
1424                    let kind = unmap_error_code(*code);
1425                    LiteralValue::Error(ExcelError::new(kind))
1426                }
1427                OverlayValue::Pending => LiteralValue::Pending,
1428            };
1429        }
1430        // Read tag and route to lane
1431        let tag_u8 = ch.type_tag.value(in_off);
1432        match TypeTag::from_u8(tag_u8) {
1433            TypeTag::Empty => LiteralValue::Empty,
1434            TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1435                if let Some(arr) = &ch.numbers {
1436                    if arr.is_null(in_off) {
1437                        return LiteralValue::Empty;
1438                    }
1439                    let nums = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1440                    LiteralValue::Number(nums.value(in_off))
1441                } else {
1442                    LiteralValue::Empty
1443                }
1444            }
1445            TypeTag::Boolean => {
1446                if let Some(arr) = &ch.booleans {
1447                    if arr.is_null(in_off) {
1448                        return LiteralValue::Empty;
1449                    }
1450                    let ba = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1451                    LiteralValue::Boolean(ba.value(in_off))
1452                } else {
1453                    LiteralValue::Empty
1454                }
1455            }
1456            TypeTag::Text => {
1457                if let Some(arr) = &ch.text {
1458                    if arr.is_null(in_off) {
1459                        return LiteralValue::Empty;
1460                    }
1461                    let sa = arr.as_any().downcast_ref::<StringArray>().unwrap();
1462                    LiteralValue::Text(sa.value(in_off).to_string())
1463                } else {
1464                    LiteralValue::Empty
1465                }
1466            }
1467            TypeTag::Error => {
1468                if let Some(arr) = &ch.errors {
1469                    if arr.is_null(in_off) {
1470                        return LiteralValue::Empty;
1471                    }
1472                    let ea = arr.as_any().downcast_ref::<UInt8Array>().unwrap();
1473                    let kind = unmap_error_code(ea.value(in_off));
1474                    LiteralValue::Error(ExcelError::new(kind))
1475                } else {
1476                    LiteralValue::Empty
1477                }
1478            }
1479            TypeTag::Pending => LiteralValue::Pending,
1480        }
1481    }
1482
1483    /// Row-aligned chunk slices within this view. Each item represents
1484    /// a contiguous row segment that lies fully within a single row chunk.
1485    pub fn row_chunk_slices(&self) -> Vec<ChunkSlice> {
1486        let mut out = Vec::new();
1487        if self.rows == 0 || self.cols == 0 {
1488            return out;
1489        }
1490        // Iterate overlapping chunks by row using first column's chunk map
1491        let sheet_rows = self.sheet.nrows as usize;
1492        let row_end = self.er.min(sheet_rows.saturating_sub(1));
1493        if self.chunk_starts.is_empty() {
1494            return out;
1495        }
1496        // For each chunk, compute intersection with [sr..=row_end]
1497        for (ci, &start) in self.chunk_starts.iter().enumerate() {
1498            let len = if ci + 1 < self.chunk_starts.len() {
1499                self.chunk_starts[ci + 1] - start
1500            } else {
1501                // last chunk length from first column
1502                if let Some(col0) = self.sheet.columns.first() {
1503                    col0.chunks[ci].type_tag.len()
1504                } else {
1505                    0
1506                }
1507            };
1508            let end = start + len - 1;
1509            let is = start.max(self.sr);
1510            let ie = end.min(row_end);
1511            if is > ie {
1512                continue;
1513            }
1514            let seg_len = ie - is + 1;
1515            let rel_off = is - start; // offset into chunk arrays
1516            // Collect per-column lane slices for columns in [sc..=ec]
1517            let mut cols = Vec::with_capacity(self.cols);
1518            for col_idx in self.sc..=self.ec {
1519                if col_idx >= self.sheet.columns.len() {
1520                    // Pad out-of-bounds columns with empty (null) lanes and Empty type_tag
1521
1522                    let numbers = Some(new_null_array(&DataType::Float64, seg_len));
1523                    let booleans = Some(new_null_array(&DataType::Boolean, seg_len));
1524                    let text = Some(new_null_array(&DataType::Utf8, seg_len));
1525                    let errors = Some(new_null_array(&DataType::UInt8, seg_len));
1526                    let type_tag: ArrayRef =
1527                        Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; seg_len]));
1528                    cols.push(ChunkCol {
1529                        numbers,
1530                        booleans,
1531                        text,
1532                        errors,
1533                        type_tag,
1534                    });
1535                } else {
1536                    let col = &self.sheet.columns[col_idx];
1537                    let ch = if ci < col.chunks.len() {
1538                        &col.chunks[ci]
1539                    } else {
1540                        // Should not happen with enforced alignment; pad as OOB if it does
1541                        let numbers = Some(new_null_array(&DataType::Float64, seg_len));
1542                        let booleans = Some(new_null_array(&DataType::Boolean, seg_len));
1543                        let text = Some(new_null_array(&DataType::Utf8, seg_len));
1544                        let errors = Some(new_null_array(&DataType::UInt8, seg_len));
1545                        let type_tag: ArrayRef =
1546                            Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; seg_len]));
1547                        cols.push(ChunkCol {
1548                            numbers,
1549                            booleans,
1550                            text,
1551                            errors,
1552                            type_tag,
1553                        });
1554                        continue;
1555                    };
1556                    use arrow_array::Array;
1557                    // Always provide a slice, lazily using per-chunk null arrays when the lane is absent
1558                    let numbers_base: ArrayRef = ch.numbers_or_null();
1559                    let booleans_base: ArrayRef = ch.booleans_or_null();
1560                    let text_base: ArrayRef = ch.text_or_null();
1561                    let errors_base: ArrayRef = ch.errors_or_null();
1562                    let numbers = Some(Array::slice(numbers_base.as_ref(), rel_off, seg_len));
1563                    let booleans = Some(Array::slice(booleans_base.as_ref(), rel_off, seg_len));
1564                    let text = Some(Array::slice(text_base.as_ref(), rel_off, seg_len));
1565                    let errors = Some(Array::slice(errors_base.as_ref(), rel_off, seg_len));
1566                    let type_tag: ArrayRef = Array::slice(ch.type_tag.as_ref(), rel_off, seg_len);
1567                    cols.push(ChunkCol {
1568                        numbers,
1569                        booleans,
1570                        text,
1571                        errors,
1572                        type_tag,
1573                    });
1574                }
1575            }
1576            out.push(ChunkSlice {
1577                row_start: is - self.sr,
1578                row_len: seg_len,
1579                cols,
1580            });
1581        }
1582        out
1583    }
1584
1585    /// Convenience iterator over row-aligned chunk slices.
1586    pub fn iter_row_chunks(&'a self) -> impl Iterator<Item = ChunkSlice> + 'a {
1587        self.row_chunk_slices().into_iter()
1588    }
1589
1590    /// Typed numeric slices per row-segment: (row_start, row_len, per-column Float64 arrays)
1591    pub fn numbers_slices(
1592        &'a self,
1593    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<Float64Array>>)> + 'a {
1594        use crate::compute_prelude::zip_select;
1595        self.iter_row_chunks().map(move |cs| {
1596            let mut out_cols: Vec<Arc<Float64Array>> = Vec::with_capacity(cs.cols.len());
1597            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1598                let base = cs.cols[local_c]
1599                    .numbers
1600                    .as_ref()
1601                    .expect("numbers lane exists")
1602                    .clone();
1603                let base_fa = base
1604                    .as_any()
1605                    .downcast_ref::<Float64Array>()
1606                    .unwrap()
1607                    .clone();
1608                let base_arc = Arc::new(base_fa);
1609
1610                // Identify chunk and overlay segment
1611                let abs_seg_start = self.sr + cs.row_start;
1612                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1613                    Ok(i) => i,
1614                    Err(0) => 0,
1615                    Err(i) => i - 1,
1616                };
1617                if col_idx >= self.sheet.columns.len() {
1618                    out_cols.push(base_arc);
1619                    continue;
1620                }
1621                let col = &self.sheet.columns[col_idx];
1622                let ch = &col.chunks[ch_idx];
1623                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1624                let seg_range = rel_off..(rel_off + cs.row_len);
1625                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1626                    || (!ch.computed_overlay.is_empty()
1627                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1628                if has_overlay {
1629                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1630                    let mut ob = Float64Builder::with_capacity(cs.row_len);
1631                    for i in 0..cs.row_len {
1632                        if let Some(ov) = ch
1633                            .overlay
1634                            .get(rel_off + i)
1635                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1636                        {
1637                            mask_b.append_value(true);
1638                            match ov {
1639                                OverlayValue::Number(n) => ob.append_value(*n),
1640                                _ => ob.append_null(),
1641                            }
1642                        } else {
1643                            mask_b.append_value(false);
1644                            ob.append_null();
1645                        }
1646                    }
1647                    let mask = mask_b.finish();
1648                    let overlay_vals = ob.finish();
1649                    let base_fa = base.as_any().downcast_ref::<Float64Array>().unwrap();
1650                    let zipped = zip_select(&mask, &overlay_vals, base_fa).expect("zip overlay");
1651                    let fa = zipped
1652                        .as_any()
1653                        .downcast_ref::<Float64Array>()
1654                        .unwrap()
1655                        .clone();
1656                    out_cols.push(Arc::new(fa));
1657                } else {
1658                    out_cols.push(base_arc);
1659                }
1660            }
1661            (cs.row_start, cs.row_len, out_cols)
1662        })
1663    }
1664
1665    /// Typed boolean slices per row-segment, overlay-aware via zip.
1666    pub fn booleans_slices(
1667        &'a self,
1668    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<BooleanArray>>)> + 'a {
1669        use crate::compute_prelude::zip_select;
1670        self.iter_row_chunks().map(move |cs| {
1671            let mut out_cols: Vec<Arc<BooleanArray>> = Vec::with_capacity(cs.cols.len());
1672            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1673                let base = cs.cols[local_c]
1674                    .booleans
1675                    .as_ref()
1676                    .expect("booleans lane exists")
1677                    .clone();
1678                let base_ba = base
1679                    .as_any()
1680                    .downcast_ref::<BooleanArray>()
1681                    .unwrap()
1682                    .clone();
1683                let base_arc: Arc<BooleanArray> = Arc::new(base_ba);
1684
1685                let abs_seg_start = self.sr + cs.row_start;
1686                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1687                    Ok(i) => i,
1688                    Err(0) => 0,
1689                    Err(i) => i - 1,
1690                };
1691                if col_idx >= self.sheet.columns.len() {
1692                    out_cols.push(base_arc);
1693                    continue;
1694                }
1695                let col = &self.sheet.columns[col_idx];
1696                let ch = &col.chunks[ch_idx];
1697                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1698                let seg_range = rel_off..(rel_off + cs.row_len);
1699                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1700                    || (!ch.computed_overlay.is_empty()
1701                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1702                if has_overlay {
1703                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1704                    let mut bb = BooleanBuilder::with_capacity(cs.row_len);
1705                    for i in 0..cs.row_len {
1706                        if let Some(ov) = ch
1707                            .overlay
1708                            .get(rel_off + i)
1709                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1710                        {
1711                            mask_b.append_value(true);
1712                            match ov {
1713                                OverlayValue::Boolean(b) => bb.append_value(*b),
1714                                _ => bb.append_null(),
1715                            }
1716                        } else {
1717                            mask_b.append_value(false);
1718                            bb.append_null();
1719                        }
1720                    }
1721                    let mask = mask_b.finish();
1722                    let overlay_vals = bb.finish();
1723                    let base_ba = base.as_any().downcast_ref::<BooleanArray>().unwrap();
1724                    let zipped =
1725                        zip_select(&mask, &overlay_vals, base_ba).expect("zip boolean overlay");
1726                    let ba = zipped
1727                        .as_any()
1728                        .downcast_ref::<BooleanArray>()
1729                        .unwrap()
1730                        .clone();
1731                    out_cols.push(Arc::new(ba));
1732                } else {
1733                    out_cols.push(base_arc);
1734                }
1735            }
1736            (cs.row_start, cs.row_len, out_cols)
1737        })
1738    }
1739
1740    /// Text slices per row-segment (erased as ArrayRef for Utf8 today; future Dict/View support).
1741    pub fn text_slices(&'a self) -> impl Iterator<Item = (usize, usize, Vec<ArrayRef>)> + 'a {
1742        use crate::compute_prelude::zip_select;
1743        self.iter_row_chunks().map(move |cs| {
1744            let mut out_cols: Vec<ArrayRef> = Vec::with_capacity(cs.cols.len());
1745            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1746                let base = cs.cols[local_c]
1747                    .text
1748                    .as_ref()
1749                    .expect("text lane exists")
1750                    .clone();
1751                let abs_seg_start = self.sr + cs.row_start;
1752                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1753                    Ok(i) => i,
1754                    Err(0) => 0,
1755                    Err(i) => i - 1,
1756                };
1757                if col_idx >= self.sheet.columns.len() {
1758                    out_cols.push(base.clone());
1759                    continue;
1760                }
1761                let col = &self.sheet.columns[col_idx];
1762                let ch = &col.chunks[ch_idx];
1763                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1764                let seg_range = rel_off..(rel_off + cs.row_len);
1765                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1766                    || (!ch.computed_overlay.is_empty()
1767                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1768                if has_overlay {
1769                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1770                    let mut sb = StringBuilder::with_capacity(cs.row_len, cs.row_len * 8);
1771                    for i in 0..cs.row_len {
1772                        if let Some(ov) = ch
1773                            .overlay
1774                            .get(rel_off + i)
1775                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1776                        {
1777                            mask_b.append_value(true);
1778                            match ov {
1779                                OverlayValue::Text(s) => sb.append_value(s),
1780                                _ => sb.append_null(),
1781                            }
1782                        } else {
1783                            mask_b.append_value(false);
1784                            sb.append_null();
1785                        }
1786                    }
1787                    let mask = mask_b.finish();
1788                    let overlay_vals = sb.finish();
1789                    let base_sa = base.as_any().downcast_ref::<StringArray>().unwrap();
1790                    let zipped =
1791                        zip_select(&mask, &overlay_vals, base_sa).expect("zip text overlay");
1792                    out_cols.push(zipped);
1793                } else {
1794                    out_cols.push(base.clone());
1795                }
1796            }
1797            (cs.row_start, cs.row_len, out_cols)
1798        })
1799    }
1800
1801    /// Typed error-code slices per row-segment.
1802    pub fn errors_slices(
1803        &'a self,
1804    ) -> impl Iterator<Item = (usize, usize, Vec<Arc<UInt8Array>>)> + 'a {
1805        use crate::compute_prelude::zip_select;
1806        self.iter_row_chunks().map(move |cs| {
1807            let mut out_cols: Vec<Arc<UInt8Array>> = Vec::with_capacity(cs.cols.len());
1808            for (local_c, col_idx) in (self.sc..=self.ec).enumerate() {
1809                let base = cs.cols[local_c]
1810                    .errors
1811                    .as_ref()
1812                    .expect("errors lane exists")
1813                    .clone();
1814                let base_e = base.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
1815                let base_arc: Arc<UInt8Array> = Arc::new(base_e);
1816                let abs_seg_start = self.sr + cs.row_start;
1817                let ch_idx = match self.chunk_starts.binary_search(&abs_seg_start) {
1818                    Ok(i) => i,
1819                    Err(0) => 0,
1820                    Err(i) => i - 1,
1821                };
1822                if col_idx >= self.sheet.columns.len() {
1823                    out_cols.push(base_arc);
1824                    continue;
1825                }
1826                let col = &self.sheet.columns[col_idx];
1827                let ch = &col.chunks[ch_idx];
1828                let rel_off = (self.sr + cs.row_start) - self.chunk_starts[ch_idx];
1829                let seg_range = rel_off..(rel_off + cs.row_len);
1830                let has_overlay = ch.overlay.any_in_range(seg_range.clone())
1831                    || (!ch.computed_overlay.is_empty()
1832                        && ch.computed_overlay.any_in_range(seg_range.clone()));
1833                if has_overlay {
1834                    let mut mask_b = BooleanBuilder::with_capacity(cs.row_len);
1835                    let mut eb = UInt8Builder::with_capacity(cs.row_len);
1836                    for i in 0..cs.row_len {
1837                        if let Some(ov) = ch
1838                            .overlay
1839                            .get(rel_off + i)
1840                            .or_else(|| ch.computed_overlay.get(rel_off + i))
1841                        {
1842                            mask_b.append_value(true);
1843                            match ov {
1844                                OverlayValue::Error(code) => eb.append_value(*code),
1845                                _ => eb.append_null(),
1846                            }
1847                        } else {
1848                            mask_b.append_value(false);
1849                            eb.append_null();
1850                        }
1851                    }
1852                    let mask = mask_b.finish();
1853                    let overlay_vals = eb.finish();
1854                    let base_ea = base.as_any().downcast_ref::<UInt8Array>().unwrap();
1855                    let zipped =
1856                        zip_select(&mask, &overlay_vals, base_ea).expect("zip err overlay");
1857                    let ea = zipped
1858                        .as_any()
1859                        .downcast_ref::<UInt8Array>()
1860                        .unwrap()
1861                        .clone();
1862                    out_cols.push(Arc::new(ea));
1863                } else {
1864                    out_cols.push(base_arc);
1865                }
1866            }
1867            (cs.row_start, cs.row_len, out_cols)
1868        })
1869    }
1870
1871    /// Build per-column concatenated lowered text arrays for this view.
1872    /// Uses per-chunk lowered cache for base text and merges overlays via zip_select.
1873    pub fn lowered_text_columns(&self) -> Vec<ArrayRef> {
1874        use arrow_array::Array;
1875        let mut out: Vec<ArrayRef> = Vec::with_capacity(self.cols);
1876        if self.rows == 0 || self.cols == 0 {
1877            return out;
1878        }
1879        let row_end = self.er.min(self.sheet.nrows.saturating_sub(1) as usize);
1880        for col_idx in self.sc..=self.ec {
1881            let mut segs: Vec<ArrayRef> = Vec::new();
1882            if col_idx >= self.sheet.columns.len() {
1883                // OOB: nulls across rows
1884                segs.push(new_null_array(&DataType::Utf8, self.rows));
1885            } else {
1886                let col_ref = &self.sheet.columns[col_idx];
1887                for (ci, &start) in self.chunk_starts.iter().enumerate() {
1888                    // length of this chunk
1889                    let len = col_ref
1890                        .chunks
1891                        .get(ci)
1892                        .map(|c| c.type_tag.len())
1893                        .unwrap_or(0);
1894                    if len == 0 {
1895                        continue;
1896                    }
1897                    let end = start + len - 1;
1898                    let is = start.max(self.sr);
1899                    let ie = end.min(row_end);
1900                    if is > ie {
1901                        continue;
1902                    }
1903                    let seg_len = ie - is + 1;
1904                    let rel_off = is - start;
1905                    if let Some(ch) = col_ref.chunks.get(ci) {
1906                        // Overlay-aware lowered segment
1907                        let has_overlay = ch.overlay.any_in_range(rel_off..(rel_off + seg_len))
1908                            || (!ch.computed_overlay.is_empty()
1909                                && ch
1910                                    .computed_overlay
1911                                    .any_in_range(rel_off..(rel_off + seg_len)));
1912                        if has_overlay {
1913                            // Build lowered overlay values builder
1914                            let mut sb = arrow_array::builder::StringBuilder::with_capacity(
1915                                seg_len,
1916                                seg_len * 8,
1917                            );
1918                            // mask overlaid rows
1919                            let mut mb =
1920                                arrow_array::builder::BooleanBuilder::with_capacity(seg_len);
1921                            for i in 0..seg_len {
1922                                if let Some(ov) = ch
1923                                    .overlay
1924                                    .get(rel_off + i)
1925                                    .or_else(|| ch.computed_overlay.get(rel_off + i))
1926                                {
1927                                    mb.append_value(true);
1928                                    match ov {
1929                                        OverlayValue::Text(s) => {
1930                                            sb.append_value(s.to_ascii_lowercase());
1931                                        }
1932                                        OverlayValue::Empty => {
1933                                            sb.append_null();
1934                                        }
1935                                        OverlayValue::Number(n) => {
1936                                            sb.append_value(n.to_string());
1937                                        }
1938                                        OverlayValue::Boolean(b) => {
1939                                            sb.append_value(if *b { "true" } else { "false" });
1940                                        }
1941                                        OverlayValue::Error(_) | OverlayValue::Pending => {
1942                                            sb.append_null();
1943                                        }
1944                                    }
1945                                } else {
1946                                    // not overlaid
1947                                    sb.append_null();
1948                                    mb.append_value(false);
1949                                }
1950                            }
1951                            let overlay_vals = sb.finish();
1952                            let mask = mb.finish();
1953                            // base lowered segment
1954                            let base_lowered = ch.text_lower_or_null();
1955                            let base_seg = Array::slice(&base_lowered, rel_off, seg_len);
1956                            let base_sa = base_seg
1957                                .as_any()
1958                                .downcast_ref::<StringArray>()
1959                                .expect("lowered slice downcast");
1960                            let zipped = zip_select(&mask, &overlay_vals, base_sa)
1961                                .expect("zip lowered text overlay");
1962                            segs.push(zipped);
1963                        } else {
1964                            // No overlay: slice from lowered base
1965                            let lowered = ch.text_lower_or_null();
1966                            segs.push(Array::slice(&lowered, rel_off, seg_len));
1967                        }
1968                    }
1969                }
1970            }
1971            // Concat segments for this column
1972            let anys: Vec<&dyn Array> = segs.iter().map(|a| a.as_ref() as &dyn Array).collect();
1973            let conc = concat_arrays(&anys).expect("concat lowered segments");
1974            out.push(conc);
1975        }
1976        out
1977    }
1978}
1979
1980pub struct ChunkSlice {
1981    pub row_start: usize, // relative to view top
1982    pub row_len: usize,
1983    pub cols: Vec<ChunkCol>,
1984}
1985
1986pub struct ChunkCol {
1987    pub numbers: Option<ArrayRef>,
1988    pub booleans: Option<ArrayRef>,
1989    pub text: Option<ArrayRef>,
1990    pub errors: Option<ArrayRef>,
1991    pub type_tag: ArrayRef,
1992}
1993
1994#[cfg(test)]
1995mod tests {
1996    use super::*;
1997    use arrow_array::Array;
1998    use arrow_schema::DataType;
1999
2000    #[test]
2001    fn ingest_mixed_rows_into_lanes_and_tags() {
2002        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
2003        let data = vec![
2004            LiteralValue::Number(42.5),                   // Number
2005            LiteralValue::Empty,                          // Empty
2006            LiteralValue::Text(String::new()),            // Empty text (Text lane)
2007            LiteralValue::Boolean(true),                  // Boolean
2008            LiteralValue::Error(ExcelError::new_value()), // Error
2009        ];
2010        for v in &data {
2011            b.append_row(std::slice::from_ref(v)).unwrap();
2012        }
2013        let sheet = b.finish();
2014        assert_eq!(sheet.nrows, 5);
2015        assert_eq!(sheet.columns.len(), 1);
2016        assert_eq!(sheet.columns[0].chunks.len(), 1);
2017        let ch = &sheet.columns[0].chunks[0];
2018
2019        // Type tags
2020        let tags = ch.type_tag.values();
2021        assert_eq!(tags.len(), 5);
2022        assert_eq!(tags[0], TypeTag::Number as u8);
2023        assert_eq!(tags[1], TypeTag::Empty as u8);
2024        assert_eq!(tags[2], TypeTag::Text as u8);
2025        assert_eq!(tags[3], TypeTag::Boolean as u8);
2026        assert_eq!(tags[4], TypeTag::Error as u8);
2027
2028        // Numbers lane validity
2029        let nums = ch.numbers.as_ref().unwrap();
2030        assert_eq!(nums.len(), 5);
2031        assert_eq!(nums.null_count(), 4);
2032        assert!(nums.is_valid(0));
2033
2034        // Booleans lane validity
2035        let bools = ch.booleans.as_ref().unwrap();
2036        assert_eq!(bools.len(), 5);
2037        assert_eq!(bools.null_count(), 4);
2038        assert!(bools.is_valid(3));
2039
2040        // Text lane validity
2041        let txt = ch.text.as_ref().unwrap();
2042        assert_eq!(txt.len(), 5);
2043        assert_eq!(txt.null_count(), 4);
2044        assert!(txt.is_valid(2)); // ""
2045
2046        // Errors lane
2047        let errs = ch.errors.as_ref().unwrap();
2048        assert_eq!(errs.len(), 5);
2049        assert_eq!(errs.null_count(), 4);
2050        assert!(errs.is_valid(4));
2051    }
2052
2053    #[test]
2054    fn range_view_get_cell_and_padding() {
2055        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2056        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
2057            .unwrap();
2058        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
2059            .unwrap();
2060        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
2061            .unwrap();
2062        let sheet = b.finish();
2063        let rv = sheet.range_view(0, 0, 2, 1);
2064        assert_eq!(rv.dims(), (3, 2));
2065        // Inside
2066        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
2067        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
2068        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
2069        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
2070        // OOB padding
2071        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
2072        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
2073
2074        // Numbers slices should produce one 2-row and one 1-row segment
2075        let nums: Vec<_> = rv.numbers_slices().collect();
2076        assert_eq!(nums.len(), 2);
2077        assert_eq!(nums[0].0, 0);
2078        assert_eq!(nums[0].1, 2);
2079        assert_eq!(nums[1].0, 2);
2080        assert_eq!(nums[1].1, 1);
2081    }
2082
2083    #[test]
2084    fn overlay_precedence_user_over_computed() {
2085        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
2086        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
2087        b.append_row(&[LiteralValue::Empty]).unwrap();
2088        b.append_row(&[LiteralValue::Empty]).unwrap();
2089        let mut sheet = b.finish();
2090
2091        let (ch_i, off) = sheet.chunk_of_row(0).unwrap();
2092        sheet.columns[0].chunks[ch_i]
2093            .computed_overlay
2094            .set(off, OverlayValue::Number(2.0));
2095
2096        let rv0 = sheet.range_view(0, 0, 0, 0);
2097        assert_eq!(rv0.get_cell(0, 0), LiteralValue::Number(2.0));
2098        let nums0: Vec<_> = rv0.numbers_slices().collect();
2099        assert_eq!(nums0.len(), 1);
2100        assert_eq!(nums0[0].2[0].value(0), 2.0);
2101
2102        sheet.columns[0].chunks[ch_i]
2103            .overlay
2104            .set(off, OverlayValue::Number(3.0));
2105
2106        let rv1 = sheet.range_view(0, 0, 0, 0);
2107        assert_eq!(rv1.get_cell(0, 0), LiteralValue::Number(3.0));
2108        let nums1: Vec<_> = rv1.numbers_slices().collect();
2109        assert_eq!(nums1.len(), 1);
2110        assert_eq!(nums1[0].2[0].value(0), 3.0);
2111    }
2112
2113    #[test]
2114    fn row_chunk_slices_shape() {
2115        // chunk_rows=2 leads to two slices for 3 rows
2116        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2117        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
2118            .unwrap();
2119        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
2120            .unwrap();
2121        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
2122            .unwrap();
2123        let sheet = b.finish();
2124        let rv = sheet.range_view(0, 0, 2, 1);
2125        let slices = rv.row_chunk_slices();
2126        assert_eq!(slices.len(), 2);
2127        assert_eq!(slices[0].row_start, 0);
2128        assert_eq!(slices[0].row_len, 2);
2129        assert_eq!(slices[0].cols.len(), 2);
2130        assert_eq!(slices[1].row_start, 2);
2131        assert_eq!(slices[1].row_len, 1);
2132        assert_eq!(slices[1].cols.len(), 2);
2133    }
2134
2135    #[test]
2136    fn oob_columns_are_padded() {
2137        // Build with 2 columns; request 3 columns (ec beyond last col)
2138        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2139        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
2140            .unwrap();
2141        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
2142            .unwrap();
2143        let sheet = b.finish();
2144        // Request cols [0..=2] → 3 columns with padding
2145        let rv = sheet.range_view(0, 0, 1, 2);
2146        assert_eq!(rv.dims(), (2, 3));
2147        let slices = rv.row_chunk_slices();
2148        assert!(!slices.is_empty());
2149        for cs in &slices {
2150            assert_eq!(cs.cols.len(), 3);
2151        }
2152        // Also validate typed slices return 3 entries per segment
2153        for (_rs, _rl, cols) in rv.numbers_slices() {
2154            assert_eq!(cols.len(), 3);
2155        }
2156        for (_rs, _rl, cols) in rv.booleans_slices() {
2157            assert_eq!(cols.len(), 3);
2158        }
2159        for (_rs, _rl, cols) in rv.text_slices() {
2160            assert_eq!(cols.len(), 3);
2161        }
2162        for (_rs, _rl, cols) in rv.errors_slices() {
2163            assert_eq!(cols.len(), 3);
2164        }
2165    }
2166
2167    #[test]
2168    fn reversed_range_is_empty() {
2169        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2170        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
2171        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
2172        let sheet = b.finish();
2173        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
2174        assert_eq!(rv.dims(), (0, 0));
2175        assert!(rv.row_chunk_slices().is_empty());
2176        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
2177    }
2178
2179    #[test]
2180    fn chunk_alignment_invariant() {
2181        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
2182        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
2183        for r in 0..5 {
2184            b.append_row(&[
2185                LiteralValue::Number(r as f64),
2186                LiteralValue::Text(format!("{r}")),
2187                if r % 2 == 0 {
2188                    LiteralValue::Empty
2189                } else {
2190                    LiteralValue::Boolean(true)
2191                },
2192            ])
2193            .unwrap();
2194        }
2195        let sheet = b.finish();
2196        // chunk_starts should be [0,2,4]
2197        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
2198        // All columns must share per-chunk lengths equal to [2,2,1]
2199        let lens0: Vec<usize> = sheet.columns[0]
2200            .chunks
2201            .iter()
2202            .map(|ch| ch.type_tag.len())
2203            .collect();
2204        for col in &sheet.columns[1..] {
2205            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2206            assert_eq!(lens, lens0);
2207        }
2208    }
2209
2210    #[test]
2211    fn chunking_splits_rows() {
2212        // Two columns, chunk size 2 → expect two chunks
2213        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2214        let rows = vec![
2215            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
2216            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
2217            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
2218        ];
2219        for r in rows {
2220            b.append_row(&r).unwrap();
2221        }
2222        let sheet = b.finish();
2223        assert_eq!(sheet.columns[0].chunks.len(), 2);
2224        assert_eq!(sheet.columns[1].chunks.len(), 2);
2225        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
2226        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
2227    }
2228
2229    #[test]
2230    fn pending_is_not_error() {
2231        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
2232        b.append_row(&[LiteralValue::Pending]).unwrap();
2233        let sheet = b.finish();
2234        let ch = &sheet.columns[0].chunks[0];
2235        // tag is Pending
2236        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
2237        // errors lane is effectively null
2238        let errs = ch.errors_or_null();
2239        assert_eq!(errs.null_count(), 1);
2240    }
2241
2242    #[test]
2243    fn all_null_numeric_lane_uses_null_array() {
2244        // Only text values in first column → numbers lane should be all null with correct dtype
2245        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
2246        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
2247        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
2248        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
2249        let sheet = b.finish();
2250        let ch = &sheet.columns[0].chunks[0];
2251        let nums = ch.numbers_or_null();
2252        assert_eq!(nums.len(), 3);
2253        assert_eq!(nums.null_count(), 3);
2254        assert_eq!(nums.data_type(), &DataType::Float64);
2255    }
2256
2257    #[test]
2258    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
2259        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
2260        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2261        for _ in 0..10 {
2262            b.append_row(&[LiteralValue::Empty]).unwrap();
2263        }
2264        let mut sheet = b.finish();
2265        // Add overlays at row 3 and row 4
2266        {
2267            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
2268            sheet.columns[0].chunks[c0]
2269                .overlay
2270                .set(o0, OverlayValue::Number(30.0));
2271            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
2272            sheet.columns[0].chunks[c1]
2273                .overlay
2274                .set(o1, OverlayValue::Number(40.0));
2275        }
2276        // Insert 2 rows before row 4 (at chunk boundary)
2277        sheet.insert_rows(4, 2);
2278        assert_eq!(sheet.nrows, 12);
2279        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
2280        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2281        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
2282        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
2283        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
2284
2285        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
2286        sheet.delete_rows(3, 3);
2287        assert_eq!(sheet.nrows, 9);
2288        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2289        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
2290        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
2291        let lens0: Vec<usize> = sheet.columns[0]
2292            .chunks
2293            .iter()
2294            .map(|ch| ch.type_tag.len())
2295            .collect();
2296        for col in &sheet.columns {
2297            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2298            assert_eq!(lens, lens0);
2299        }
2300        // chunk_starts should be monotonic and final chunk end == nrows
2301        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2302        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2303        let last_len = sheet.columns[0]
2304            .chunks
2305            .last()
2306            .map(|c| c.type_tag.len())
2307            .unwrap_or(0);
2308        assert_eq!(last_start + last_len, sheet.nrows as usize);
2309    }
2310
2311    #[test]
2312    fn column_insert_delete_retains_chunk_alignment() {
2313        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2314        for _ in 0..5 {
2315            b.append_row(&[
2316                LiteralValue::Empty,
2317                LiteralValue::Empty,
2318                LiteralValue::Empty,
2319            ])
2320            .unwrap();
2321        }
2322        let mut sheet = b.finish();
2323        // Record reference chunk lengths of first column
2324        let ref_lens: Vec<usize> = sheet.columns[0]
2325            .chunks
2326            .iter()
2327            .map(|ch| ch.type_tag.len())
2328            .collect();
2329        // Insert 2 columns before index 1
2330        sheet.insert_columns(1, 2);
2331        assert_eq!(sheet.columns.len(), 5);
2332        for col in &sheet.columns {
2333            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2334            assert_eq!(lens, ref_lens);
2335        }
2336        let starts_before = sheet.chunk_starts.clone();
2337        // Delete 2 columns starting at index 2 → back to 3 columns
2338        sheet.delete_columns(2, 2);
2339        assert_eq!(sheet.columns.len(), 3);
2340        for col in &sheet.columns {
2341            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2342            assert_eq!(lens, ref_lens);
2343        }
2344        // chunk_starts unchanged by column operations
2345        assert_eq!(sheet.chunk_starts, starts_before);
2346    }
2347
2348    #[test]
2349    fn multiple_adjacent_row_ops_overlay_mixed_types() {
2350        use formualizer_common::ExcelErrorKind;
2351        // Two columns to ensure alignment preserved across columns
2352        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
2353        for _ in 0..9 {
2354            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2355                .unwrap();
2356        }
2357        let mut sheet = b.finish();
2358        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
2359        // Column 0 only
2360        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
2361            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2362            sh.columns[0].chunks[ch_i].overlay.set(off, ov);
2363        };
2364        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
2365        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
2366        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
2367        set_ov(
2368            &mut sheet,
2369            6,
2370            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
2371        );
2372        set_ov(&mut sheet, 8, OverlayValue::Empty);
2373
2374        // Insert 1 row before index 3
2375        sheet.insert_rows(3, 1);
2376        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
2377        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2378        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
2379        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
2380        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
2381        match av1.get_cell(7, 0) {
2382            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2383            other => panic!("expected error at row 7, got {other:?}"),
2384        }
2385        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
2386
2387        // Insert 2 rows before index 4 (adjacent to previous region)
2388        sheet.insert_rows(4, 2);
2389        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
2390        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2391        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
2392        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
2393        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
2394        match av2.get_cell(9, 0) {
2395            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2396            other => panic!("expected error at row 9, got {other:?}"),
2397        }
2398        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
2399
2400        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
2401        sheet.delete_rows(6, 2);
2402        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2403        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
2404        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
2405        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
2406        match av3.get_cell(7, 0) {
2407            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2408            other => panic!("expected error at row 8, got {other:?}"),
2409        }
2410        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
2411
2412        // Alignment checks
2413        let lens0: Vec<usize> = sheet.columns[0]
2414            .chunks
2415            .iter()
2416            .map(|ch| ch.type_tag.len())
2417            .collect();
2418        for col in &sheet.columns {
2419            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2420            assert_eq!(lens, lens0);
2421        }
2422        // chunk_starts monotonically increasing and cover nrows
2423        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2424        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2425        let last_len = sheet.columns[0]
2426            .chunks
2427            .last()
2428            .map(|c| c.type_tag.len())
2429            .unwrap_or(0);
2430        assert_eq!(last_start + last_len, sheet.nrows as usize);
2431    }
2432
2433    #[test]
2434    fn multiple_adjacent_column_ops_alignment() {
2435        // Start with 2 columns, chunk_rows=2, rows=5
2436        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2437        for _ in 0..5 {
2438            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2439                .unwrap();
2440        }
2441        let mut sheet = b.finish();
2442        let ref_lens: Vec<usize> = sheet.columns[0]
2443            .chunks
2444            .iter()
2445            .map(|ch| ch.type_tag.len())
2446            .collect();
2447        // Insert 1 at start, then 2 at index 2 → columns = 5
2448        sheet.insert_columns(0, 1);
2449        sheet.insert_columns(2, 2);
2450        assert_eq!(sheet.columns.len(), 5);
2451        for col in &sheet.columns {
2452            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2453            assert_eq!(lens, ref_lens);
2454        }
2455        let starts_before = sheet.chunk_starts.clone();
2456        // Delete 1 at index 1, then 2 at the end if available
2457        sheet.delete_columns(1, 1);
2458        let remain = sheet.columns.len();
2459        if remain >= 3 {
2460            sheet.delete_columns(remain - 2, 2);
2461        }
2462        for col in &sheet.columns {
2463            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2464            assert_eq!(lens, ref_lens);
2465        }
2466        assert_eq!(sheet.chunk_starts, starts_before);
2467    }
2468
2469    #[test]
2470    fn overlays_on_multiple_columns_row_col_ops() {
2471        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
2472        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2473        for _ in 0..6 {
2474            b.append_row(&[
2475                LiteralValue::Empty,
2476                LiteralValue::Empty,
2477                LiteralValue::Empty,
2478            ])
2479            .unwrap();
2480        }
2481        let mut sheet = b.finish();
2482        // Overlays at row2 and row3 across columns with different types
2483        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
2484            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2485            sh.columns[col].chunks[ch_i].overlay.set(off, ov);
2486        };
2487        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
2488        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
2489        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
2490        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
2491        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
2492        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
2493
2494        // Insert a row at boundary (before row index 3)
2495        sheet.insert_rows(3, 1);
2496        // Now original row>=3 shift down by 1
2497        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
2498        // Row 2 values unchanged
2499        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
2500        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
2501        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
2502        // Row 3 became Empty (inserted)
2503        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
2504        // Row 4 holds old row 3 overlays
2505        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
2506        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
2507        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
2508
2509        // Delete column 1 (middle), values shift left
2510        sheet.delete_columns(1, 1);
2511        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
2512        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
2513        // Column 1 now was old column 2
2514        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
2515        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
2516        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
2517
2518        // Alignment preserved
2519        let lens0: Vec<usize> = sheet.columns[0]
2520            .chunks
2521            .iter()
2522            .map(|ch| ch.type_tag.len())
2523            .collect();
2524        for col in &sheet.columns {
2525            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2526            assert_eq!(lens, lens0);
2527        }
2528    }
2529
2530    #[test]
2531    fn effective_slices_overlay_precedence_numbers_text() {
2532        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
2533        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2534        for i in 0..6 {
2535            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2536                .unwrap();
2537        }
2538        let mut sheet = b.finish();
2539        // Overlays: row1 -> Text("X"), row4 -> Number(99)
2540        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2541        sheet.columns[0].chunks[c1]
2542            .overlay
2543            .set(o1, OverlayValue::Text(Arc::from("X")));
2544        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2545        sheet.columns[0].chunks[c4]
2546            .overlay
2547            .set(o4, OverlayValue::Number(99.0));
2548
2549        let av = sheet.range_view(0, 0, 5, 0);
2550        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
2551        let mut numeric: Vec<Option<f64>> = vec![None; 6];
2552        for (row_start, row_len, cols) in av.numbers_slices() {
2553            let a = &cols[0];
2554            for i in 0..row_len {
2555                let idx = row_start + i;
2556                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2557            }
2558        }
2559        assert_eq!(numeric[0], Some(1.0));
2560        assert_eq!(numeric[1], None); // overshadowed by text overlay
2561        assert_eq!(numeric[2], Some(3.0));
2562        assert_eq!(numeric[3], Some(4.0));
2563        assert_eq!(numeric[4], Some(99.0));
2564        assert_eq!(numeric[5], Some(6.0));
2565
2566        // Validate text_slices: row1 has "X", others null
2567        let mut texts: Vec<Option<String>> = vec![None; 6];
2568        for (row_start, row_len, cols) in av.text_slices() {
2569            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
2570            for i in 0..row_len {
2571                let idx = row_start + i;
2572                texts[idx] = if a.is_null(i) {
2573                    None
2574                } else {
2575                    Some(a.value(i).to_string())
2576                };
2577            }
2578        }
2579        assert_eq!(texts[1].as_deref(), Some("X"));
2580        assert!(texts[0].is_none());
2581        assert!(texts[2].is_none());
2582        assert!(texts[3].is_none());
2583        assert!(texts[4].is_none());
2584        assert!(texts[5].is_none());
2585    }
2586
2587    #[test]
2588    fn effective_slices_overlay_precedence_booleans() {
2589        // Base booleans over 1 column; overlays include boolean and non-boolean types.
2590        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2591        for i in 0..6 {
2592            let v = if i % 2 == 0 {
2593                LiteralValue::Boolean(true)
2594            } else {
2595                LiteralValue::Boolean(false)
2596            };
2597            b.append_row(&[v]).unwrap();
2598        }
2599        let mut sheet = b.finish();
2600        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
2601        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2602        sheet.columns[0].chunks[c1]
2603            .overlay
2604            .set(o1, OverlayValue::Boolean(true));
2605        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
2606        sheet.columns[0].chunks[c2]
2607            .overlay
2608            .set(o2, OverlayValue::Text(Arc::from("T")));
2609
2610        let av = sheet.range_view(0, 0, 5, 0);
2611        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
2612        let mut bools: Vec<Option<bool>> = vec![None; 6];
2613        for (row_start, row_len, cols) in av.booleans_slices() {
2614            let a = &cols[0];
2615            for i in 0..row_len {
2616                let idx = row_start + i;
2617                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2618            }
2619        }
2620        assert_eq!(bools[0], Some(true));
2621        assert_eq!(bools[1], Some(true)); // overlay to true
2622        assert_eq!(bools[2], None); // overshadowed by text overlay
2623        // spot-check others remain base
2624        assert_eq!(bools[3], Some(false));
2625    }
2626
2627    #[test]
2628    fn effective_slices_overlay_precedence_errors() {
2629        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
2630        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2631        for i in 0..6 {
2632            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2633                .unwrap();
2634        }
2635        let mut sheet = b.finish();
2636        // Overlay error at row 4
2637        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2638        sheet.columns[0].chunks[c4]
2639            .overlay
2640            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
2641
2642        let av = sheet.range_view(0, 0, 5, 0);
2643        let mut errs: Vec<Option<u8>> = vec![None; 6];
2644        for (row_start, row_len, cols) in av.errors_slices() {
2645            let a = &cols[0];
2646            for i in 0..row_len {
2647                let idx = row_start + i;
2648                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2649            }
2650        }
2651        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
2652        assert!(errs[3].is_none());
2653    }
2654}