Skip to main content

formualizer_eval/arrow_store/
mod.rs

1use arrow_array::Array;
2use arrow_array::new_null_array;
3use arrow_schema::DataType;
4use chrono::Timelike;
5use std::sync::Arc;
6
7use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
8use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
9use once_cell::sync::OnceCell;
10
11use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
12use rustc_hash::FxHashMap;
13use std::collections::HashMap;
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane (ASCII lower), nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91    // Phase 0/1: separate computed overlay (formula/spill outputs)
92    pub computed_overlay: Overlay,
93}
94
95impl ColumnChunk {
96    #[inline]
97    pub fn len(&self) -> usize {
98        self.type_tag.len()
99    }
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104    #[inline]
105    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
106        if let Some(a) = &self.numbers {
107            return a.clone();
108        }
109        self.lazy_null_numbers
110            .get_or_init(|| {
111                let arr = new_null_array(&DataType::Float64, self.len());
112                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
113            })
114            .clone()
115    }
116    #[inline]
117    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
118        if let Some(a) = &self.booleans {
119            return a.clone();
120        }
121        self.lazy_null_booleans
122            .get_or_init(|| {
123                let arr = new_null_array(&DataType::Boolean, self.len());
124                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
125            })
126            .clone()
127    }
128    #[inline]
129    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
130        if let Some(a) = &self.errors {
131            return a.clone();
132        }
133        self.lazy_null_errors
134            .get_or_init(|| {
135                let arr = new_null_array(&DataType::UInt8, self.len());
136                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
137            })
138            .clone()
139    }
140    #[inline]
141    pub fn text_or_null(&self) -> ArrayRef {
142        if let Some(a) = &self.text {
143            return a.clone();
144        }
145        self.lazy_null_text
146            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
147            .clone()
148    }
149
150    /// Lowercased text lane (ASCII lower), with nulls preserved. Cached per chunk.
151    pub fn text_lower_or_null(&self) -> ArrayRef {
152        if let Some(a) = self.lowered_text.get() {
153            return a.clone();
154        }
155        // Lowercase when text present; else return null Utf8
156        let out: ArrayRef = if let Some(txt) = &self.text {
157            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
158            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
159            for i in 0..sa.len() {
160                if sa.is_null(i) {
161                    b.append_null();
162                } else {
163                    b.append_value(sa.value(i).to_ascii_lowercase());
164                }
165            }
166            let lowered = b.finish();
167            Arc::new(lowered)
168        } else {
169            new_null_array(&DataType::Utf8, self.len())
170        };
171        self.lowered_text.get_or_init(|| out.clone());
172        out
173    }
174
175    /// Grow this chunk's logical length to `new_len` (padding with empty/null values).
176    ///
177    /// This is used to keep already-materialized chunks consistent when `ArrowSheet::nrows`
178    /// grows incrementally inside the current last chunk.
179    pub fn grow_len_to(&mut self, new_len: usize) {
180        let old_len = self.len();
181        if new_len <= old_len {
182            return;
183        }
184
185        // Grow type tags (pad with Empty).
186        let mut tags: Vec<u8> = self.type_tag.values().to_vec();
187        tags.resize(new_len, TypeTag::Empty as u8);
188        self.type_tag = Arc::new(UInt8Array::from(tags));
189
190        // Grow lanes when present; append nulls for new rows.
191        if let Some(a) = &self.numbers {
192            use arrow_array::builder::Float64Builder;
193            let mut b = Float64Builder::with_capacity(new_len);
194            for i in 0..old_len {
195                if a.is_null(i) {
196                    b.append_null();
197                } else {
198                    b.append_value(a.value(i));
199                }
200            }
201            for _ in old_len..new_len {
202                b.append_null();
203            }
204            self.numbers = Some(Arc::new(b.finish()));
205        }
206        if let Some(a) = &self.booleans {
207            use arrow_array::builder::BooleanBuilder;
208            let mut b = BooleanBuilder::with_capacity(new_len);
209            for i in 0..old_len {
210                if a.is_null(i) {
211                    b.append_null();
212                } else {
213                    b.append_value(a.value(i));
214                }
215            }
216            for _ in old_len..new_len {
217                b.append_null();
218            }
219            self.booleans = Some(Arc::new(b.finish()));
220        }
221        if let Some(a) = &self.errors {
222            use arrow_array::builder::UInt8Builder;
223            let mut b = UInt8Builder::with_capacity(new_len);
224            for i in 0..old_len {
225                if a.is_null(i) {
226                    b.append_null();
227                } else {
228                    b.append_value(a.value(i));
229                }
230            }
231            for _ in old_len..new_len {
232                b.append_null();
233            }
234            self.errors = Some(Arc::new(b.finish()));
235        }
236        if let Some(a) = &self.text {
237            use arrow_array::builder::StringBuilder;
238            let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
239            let mut b = StringBuilder::with_capacity(new_len, 0);
240            for i in 0..old_len {
241                if sa.is_null(i) {
242                    b.append_null();
243                } else {
244                    b.append_value(sa.value(i));
245                }
246            }
247            for _ in old_len..new_len {
248                b.append_null();
249            }
250            self.text = Some(Arc::new(b.finish()) as ArrayRef);
251        }
252
253        // Length-dependent caches must be dropped.
254        self.lazy_null_numbers = OnceCell::new();
255        self.lazy_null_booleans = OnceCell::new();
256        self.lazy_null_text = OnceCell::new();
257        self.lazy_null_errors = OnceCell::new();
258        self.lowered_text = OnceCell::new();
259
260        self.meta.len = new_len;
261    }
262}
263
264#[derive(Debug, Clone)]
265pub struct ArrowColumn {
266    pub chunks: Vec<ColumnChunk>,
267    pub sparse_chunks: FxHashMap<usize, ColumnChunk>,
268    pub index: u32,
269}
270
271impl ArrowColumn {
272    #[inline]
273    pub fn chunk(&self, idx: usize) -> Option<&ColumnChunk> {
274        if idx < self.chunks.len() {
275            Some(&self.chunks[idx])
276        } else {
277            self.sparse_chunks.get(&idx)
278        }
279    }
280
281    #[inline]
282    pub fn chunk_mut(&mut self, idx: usize) -> Option<&mut ColumnChunk> {
283        if idx < self.chunks.len() {
284            Some(&mut self.chunks[idx])
285        } else {
286            self.sparse_chunks.get_mut(&idx)
287        }
288    }
289
290    #[inline]
291    pub fn has_sparse_chunks(&self) -> bool {
292        !self.sparse_chunks.is_empty()
293    }
294
295    #[inline]
296    pub fn total_chunk_count(&self) -> usize {
297        self.chunks.len() + self.sparse_chunks.len()
298    }
299}
300
301#[derive(Debug, Clone)]
302pub struct ArrowSheet {
303    pub name: Arc<str>,
304    pub columns: Vec<ArrowColumn>,
305    pub nrows: u32,
306    pub chunk_starts: Vec<usize>,
307    /// Preferred chunk size (rows) for capacity growth operations.
308    ///
309    /// For Arrow-ingested sheets this matches the ingest `chunk_rows`. For sparse/overlay-created
310    /// sheets this defaults to 32k to avoid creating thousands of tiny chunks during growth.
311    pub chunk_rows: usize,
312}
313
314#[derive(Debug, Default, Clone)]
315pub struct SheetStore {
316    pub sheets: Vec<ArrowSheet>,
317}
318
319impl SheetStore {
320    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
321        self.sheets.iter().find(|s| s.name.as_ref() == name)
322    }
323    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
324        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
325    }
326}
327
328/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
329pub struct IngestBuilder {
330    name: Arc<str>,
331    ncols: usize,
332    chunk_rows: usize,
333    date_system: crate::engine::DateSystem,
334
335    // Per-column active builders for current chunk
336    num_builders: Vec<Float64Builder>,
337    bool_builders: Vec<BooleanBuilder>,
338    text_builders: Vec<StringBuilder>,
339    err_builders: Vec<UInt8Builder>,
340    tag_builders: Vec<UInt8Builder>,
341
342    // Per-column per-lane non-null counters for current chunk
343    lane_counts: Vec<LaneCounts>,
344
345    // Accumulated chunks
346    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
347    row_in_chunk: usize,
348    total_rows: u32,
349}
350
351#[derive(Debug, Clone, Copy, Default)]
352struct LaneCounts {
353    n_num: usize,
354    n_bool: usize,
355    n_text: usize,
356    n_err: usize,
357}
358
359impl IngestBuilder {
360    pub fn new(
361        sheet_name: &str,
362        ncols: usize,
363        chunk_rows: usize,
364        date_system: crate::engine::DateSystem,
365    ) -> Self {
366        let mut chunks = Vec::with_capacity(ncols);
367        chunks.resize_with(ncols, Vec::new);
368        Self {
369            name: Arc::from(sheet_name.to_string()),
370            ncols,
371            chunk_rows: chunk_rows.max(1),
372            date_system,
373            num_builders: (0..ncols)
374                .map(|_| Float64Builder::with_capacity(chunk_rows))
375                .collect(),
376            bool_builders: (0..ncols)
377                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
378                .collect(),
379            text_builders: (0..ncols)
380                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
381                .collect(),
382            err_builders: (0..ncols)
383                .map(|_| UInt8Builder::with_capacity(chunk_rows))
384                .collect(),
385            tag_builders: (0..ncols)
386                .map(|_| UInt8Builder::with_capacity(chunk_rows))
387                .collect(),
388            lane_counts: vec![LaneCounts::default(); ncols],
389            chunks,
390            row_in_chunk: 0,
391            total_rows: 0,
392        }
393    }
394
395    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
396    /// Text borrows are copied into the internal StringBuilder.
397    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
398        assert_eq!(row.len(), self.ncols, "row width mismatch");
399        for (c, cell) in row.iter().enumerate() {
400            match cell {
401                CellIngest::Empty => {
402                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
403                    self.num_builders[c].append_null();
404                    self.bool_builders[c].append_null();
405                    self.text_builders[c].append_null();
406                    self.err_builders[c].append_null();
407                }
408                CellIngest::Number(n) => {
409                    self.tag_builders[c].append_value(TypeTag::Number as u8);
410                    self.num_builders[c].append_value(*n);
411                    self.lane_counts[c].n_num += 1;
412                    self.bool_builders[c].append_null();
413                    self.text_builders[c].append_null();
414                    self.err_builders[c].append_null();
415                }
416                CellIngest::Boolean(b) => {
417                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
418                    self.num_builders[c].append_null();
419                    self.bool_builders[c].append_value(*b);
420                    self.lane_counts[c].n_bool += 1;
421                    self.text_builders[c].append_null();
422                    self.err_builders[c].append_null();
423                }
424                CellIngest::Text(s) => {
425                    self.tag_builders[c].append_value(TypeTag::Text as u8);
426                    self.num_builders[c].append_null();
427                    self.bool_builders[c].append_null();
428                    self.text_builders[c].append_value(s);
429                    self.lane_counts[c].n_text += 1;
430                    self.err_builders[c].append_null();
431                }
432                CellIngest::ErrorCode(code) => {
433                    self.tag_builders[c].append_value(TypeTag::Error as u8);
434                    self.num_builders[c].append_null();
435                    self.bool_builders[c].append_null();
436                    self.text_builders[c].append_null();
437                    self.err_builders[c].append_value(*code);
438                    self.lane_counts[c].n_err += 1;
439                }
440                CellIngest::DateSerial(serial) => {
441                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
442                    self.num_builders[c].append_value(*serial);
443                    self.lane_counts[c].n_num += 1;
444                    self.bool_builders[c].append_null();
445                    self.text_builders[c].append_null();
446                    self.err_builders[c].append_null();
447                }
448                CellIngest::Pending => {
449                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
450                    self.num_builders[c].append_null();
451                    self.bool_builders[c].append_null();
452                    self.text_builders[c].append_null();
453                    self.err_builders[c].append_null();
454                }
455            }
456        }
457        self.row_in_chunk += 1;
458        self.total_rows += 1;
459        if self.row_in_chunk >= self.chunk_rows {
460            self.finish_chunk();
461        }
462        Ok(())
463    }
464
465    /// Streaming row append from an iterator of typed cell tokens.
466    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
467    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
468    where
469        I: ExactSizeIterator<Item = CellIngest<'a>>,
470    {
471        assert_eq!(iter.len(), self.ncols, "row width mismatch");
472        for (c, cell) in iter.enumerate() {
473            match cell {
474                CellIngest::Empty => {
475                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
476                    self.num_builders[c].append_null();
477                    self.bool_builders[c].append_null();
478                    self.text_builders[c].append_null();
479                    self.err_builders[c].append_null();
480                }
481                CellIngest::Number(n) => {
482                    self.tag_builders[c].append_value(TypeTag::Number as u8);
483                    self.num_builders[c].append_value(n);
484                    self.lane_counts[c].n_num += 1;
485                    self.bool_builders[c].append_null();
486                    self.text_builders[c].append_null();
487                    self.err_builders[c].append_null();
488                }
489                CellIngest::Boolean(b) => {
490                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
491                    self.num_builders[c].append_null();
492                    self.bool_builders[c].append_value(b);
493                    self.lane_counts[c].n_bool += 1;
494                    self.text_builders[c].append_null();
495                    self.err_builders[c].append_null();
496                }
497                CellIngest::Text(s) => {
498                    self.tag_builders[c].append_value(TypeTag::Text as u8);
499                    self.num_builders[c].append_null();
500                    self.bool_builders[c].append_null();
501                    self.text_builders[c].append_value(s);
502                    self.lane_counts[c].n_text += 1;
503                    self.err_builders[c].append_null();
504                }
505                CellIngest::ErrorCode(code) => {
506                    self.tag_builders[c].append_value(TypeTag::Error as u8);
507                    self.num_builders[c].append_null();
508                    self.bool_builders[c].append_null();
509                    self.text_builders[c].append_null();
510                    self.err_builders[c].append_value(code);
511                    self.lane_counts[c].n_err += 1;
512                }
513                CellIngest::DateSerial(serial) => {
514                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
515                    self.num_builders[c].append_value(serial);
516                    self.lane_counts[c].n_num += 1;
517                    self.bool_builders[c].append_null();
518                    self.text_builders[c].append_null();
519                    self.err_builders[c].append_null();
520                }
521                CellIngest::Pending => {
522                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
523                    self.num_builders[c].append_null();
524                    self.bool_builders[c].append_null();
525                    self.text_builders[c].append_null();
526                    self.err_builders[c].append_null();
527                }
528            }
529        }
530        self.row_in_chunk += 1;
531        self.total_rows += 1;
532        if self.row_in_chunk >= self.chunk_rows {
533            self.finish_chunk();
534        }
535        Ok(())
536    }
537
538    /// Append a single row of values. Length must match `ncols`.
539    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
540        assert_eq!(row.len(), self.ncols, "row width mismatch");
541
542        for (c, v) in row.iter().enumerate() {
543            let tag = TypeTag::from_value(v) as u8;
544            self.tag_builders[c].append_value(tag);
545
546            match v {
547                LiteralValue::Empty => {
548                    self.num_builders[c].append_null();
549                    self.bool_builders[c].append_null();
550                    self.text_builders[c].append_null();
551                    self.err_builders[c].append_null();
552                }
553                LiteralValue::Int(i) => {
554                    self.num_builders[c].append_value(*i as f64);
555                    self.lane_counts[c].n_num += 1;
556                    self.bool_builders[c].append_null();
557                    self.text_builders[c].append_null();
558                    self.err_builders[c].append_null();
559                }
560                LiteralValue::Number(n) => {
561                    self.num_builders[c].append_value(*n);
562                    self.lane_counts[c].n_num += 1;
563                    self.bool_builders[c].append_null();
564                    self.text_builders[c].append_null();
565                    self.err_builders[c].append_null();
566                }
567                LiteralValue::Boolean(b) => {
568                    self.num_builders[c].append_null();
569                    self.bool_builders[c].append_value(*b);
570                    self.lane_counts[c].n_bool += 1;
571                    self.text_builders[c].append_null();
572                    self.err_builders[c].append_null();
573                }
574                LiteralValue::Text(s) => {
575                    self.num_builders[c].append_null();
576                    self.bool_builders[c].append_null();
577                    self.text_builders[c].append_value(s);
578                    self.lane_counts[c].n_text += 1;
579                    self.err_builders[c].append_null();
580                }
581                LiteralValue::Error(e) => {
582                    self.num_builders[c].append_null();
583                    self.bool_builders[c].append_null();
584                    self.text_builders[c].append_null();
585                    self.err_builders[c].append_value(map_error_code(e.kind));
586                    self.lane_counts[c].n_err += 1;
587                }
588                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
589                LiteralValue::Date(d) => {
590                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
591                    let serial =
592                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
593                    self.num_builders[c].append_value(serial);
594                    self.lane_counts[c].n_num += 1;
595                    self.bool_builders[c].append_null();
596                    self.text_builders[c].append_null();
597                    self.err_builders[c].append_null();
598                }
599                LiteralValue::DateTime(dt) => {
600                    let serial =
601                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
602                    self.num_builders[c].append_value(serial);
603                    self.lane_counts[c].n_num += 1;
604                    self.bool_builders[c].append_null();
605                    self.text_builders[c].append_null();
606                    self.err_builders[c].append_null();
607                }
608                LiteralValue::Time(t) => {
609                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
610                    self.num_builders[c].append_value(serial);
611                    self.lane_counts[c].n_num += 1;
612                    self.bool_builders[c].append_null();
613                    self.text_builders[c].append_null();
614                    self.err_builders[c].append_null();
615                }
616                LiteralValue::Duration(dur) => {
617                    let serial = dur.num_seconds() as f64 / 86_400.0;
618                    self.num_builders[c].append_value(serial);
619                    self.lane_counts[c].n_num += 1;
620                    self.bool_builders[c].append_null();
621                    self.text_builders[c].append_null();
622                    self.err_builders[c].append_null();
623                }
624                LiteralValue::Array(_) => {
625                    // Not allowed as a stored scalar; mark as error kind VALUE
626                    self.num_builders[c].append_null();
627                    self.bool_builders[c].append_null();
628                    self.text_builders[c].append_null();
629                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
630                    self.lane_counts[c].n_err += 1;
631                }
632                LiteralValue::Pending => {
633                    // Pending: tag only; all lanes remain null (no error)
634                    self.num_builders[c].append_null();
635                    self.bool_builders[c].append_null();
636                    self.text_builders[c].append_null();
637                    self.err_builders[c].append_null();
638                }
639            }
640        }
641
642        self.row_in_chunk += 1;
643        self.total_rows += 1;
644
645        if self.row_in_chunk >= self.chunk_rows {
646            self.finish_chunk();
647        }
648
649        Ok(())
650    }
651
652    fn finish_chunk(&mut self) {
653        if self.row_in_chunk == 0 {
654            return;
655        }
656        for c in 0..self.ncols {
657            let len = self.row_in_chunk;
658            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
659                None
660            } else {
661                Some(Arc::new(self.num_builders[c].finish()))
662            };
663            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
664                None
665            } else {
666                Some(Arc::new(self.bool_builders[c].finish()))
667            };
668            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
669                None
670            } else {
671                Some(Arc::new(self.text_builders[c].finish()))
672            };
673            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
674                None
675            } else {
676                Some(Arc::new(self.err_builders[c].finish()))
677            };
678            let tags: UInt8Array = self.tag_builders[c].finish();
679
680            let chunk = ColumnChunk {
681                numbers: numbers_arc,
682                booleans: booleans_arc,
683                text: text_ref,
684                errors: errors_arc,
685                type_tag: Arc::new(tags),
686                formula_id: None,
687                meta: ColumnChunkMeta {
688                    len,
689                    non_null_num: self.lane_counts[c].n_num,
690                    non_null_bool: self.lane_counts[c].n_bool,
691                    non_null_text: self.lane_counts[c].n_text,
692                    non_null_err: self.lane_counts[c].n_err,
693                },
694                lazy_null_numbers: OnceCell::new(),
695                lazy_null_booleans: OnceCell::new(),
696                lazy_null_text: OnceCell::new(),
697                lazy_null_errors: OnceCell::new(),
698                lowered_text: OnceCell::new(),
699                overlay: Overlay::new(),
700                computed_overlay: Overlay::new(),
701            };
702            self.chunks[c].push(chunk);
703
704            // re-init builders for next chunk
705            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
706            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
707            self.text_builders[c] =
708                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
709            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
710            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
711            self.lane_counts[c] = LaneCounts::default();
712        }
713        self.row_in_chunk = 0;
714    }
715
716    pub fn finish(mut self) -> ArrowSheet {
717        // flush partial chunk
718        if self.row_in_chunk > 0 {
719            self.finish_chunk();
720        }
721
722        let mut columns = Vec::with_capacity(self.ncols);
723        for (idx, chunks) in self.chunks.into_iter().enumerate() {
724            columns.push(ArrowColumn {
725                chunks,
726                sparse_chunks: FxHashMap::default(),
727                index: idx as u32,
728            });
729        }
730        // Precompute chunk starts from first column and enforce alignment across columns
731        let mut chunk_starts: Vec<usize> = Vec::new();
732        if let Some(col0) = columns.first() {
733            let chunks_len0 = col0.chunks.len();
734            for (ci, col) in columns.iter().enumerate() {
735                if col.chunks.len() != chunks_len0 {
736                    panic!(
737                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
738                        ci,
739                        col.chunks.len(),
740                        chunks_len0
741                    );
742                }
743            }
744            let mut cur = 0usize;
745            for i in 0..chunks_len0 {
746                let len_i = col0.chunks[i].type_tag.len();
747                for (ci, col) in columns.iter().enumerate() {
748                    let got = col.chunks[i].type_tag.len();
749                    if got != len_i {
750                        panic!(
751                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
752                        );
753                    }
754                }
755                chunk_starts.push(cur);
756                cur += len_i;
757            }
758        }
759        ArrowSheet {
760            name: self.name,
761            columns,
762            nrows: self.total_rows,
763            chunk_starts,
764            chunk_rows: self.chunk_rows,
765        }
766    }
767}
768
769pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
770    match kind {
771        ExcelErrorKind::Null => 1,
772        ExcelErrorKind::Ref => 2,
773        ExcelErrorKind::Name => 3,
774        ExcelErrorKind::Value => 4,
775        ExcelErrorKind::Div => 5,
776        ExcelErrorKind::Na => 6,
777        ExcelErrorKind::Num => 7,
778        ExcelErrorKind::Error => 8,
779        ExcelErrorKind::NImpl => 9,
780        ExcelErrorKind::Spill => 10,
781        ExcelErrorKind::Calc => 11,
782        ExcelErrorKind::Circ => 12,
783        ExcelErrorKind::Cancelled => 13,
784    }
785}
786
787pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
788    match code {
789        1 => ExcelErrorKind::Null,
790        2 => ExcelErrorKind::Ref,
791        3 => ExcelErrorKind::Name,
792        4 => ExcelErrorKind::Value,
793        5 => ExcelErrorKind::Div,
794        6 => ExcelErrorKind::Na,
795        7 => ExcelErrorKind::Num,
796        8 => ExcelErrorKind::Error,
797        9 => ExcelErrorKind::NImpl,
798        10 => ExcelErrorKind::Spill,
799        11 => ExcelErrorKind::Calc,
800        12 => ExcelErrorKind::Circ,
801        13 => ExcelErrorKind::Cancelled,
802        _ => ExcelErrorKind::Error,
803    }
804}
805
806// ─────────────────────────── Overlay (Phase C) ────────────────────────────
807
808/// Zero-allocation cell token for ingestion.
809pub enum CellIngest<'a> {
810    Empty,
811    Number(f64),
812    Boolean(bool),
813    Text(&'a str),
814    ErrorCode(u8),
815    DateSerial(f64),
816    Pending,
817}
818
819#[derive(Debug, Clone)]
820pub enum OverlayValue {
821    Empty,
822    Number(f64),
823    /// Date/Time/DateTime stored as an Excel serial in the numeric lane.
824    DateTime(f64),
825    /// Duration stored as an Excel-style day-fraction in the numeric lane.
826    Duration(f64),
827    Boolean(bool),
828    Text(Arc<str>),
829    Error(u8),
830    Pending,
831}
832
833impl OverlayValue {
834    #[inline]
835    fn estimated_payload_bytes(&self) -> usize {
836        match self {
837            OverlayValue::Empty | OverlayValue::Pending => 0,
838            OverlayValue::Number(_) | OverlayValue::DateTime(_) | OverlayValue::Duration(_) => {
839                core::mem::size_of::<f64>()
840            }
841            OverlayValue::Boolean(_) => core::mem::size_of::<bool>(),
842            OverlayValue::Error(_) => core::mem::size_of::<u8>(),
843            // Deterministic estimate: count string bytes only.
844            OverlayValue::Text(s) => s.len(),
845        }
846    }
847}
848
849#[derive(Debug, Default, Clone)]
850pub struct Overlay {
851    map: HashMap<usize, OverlayValue>,
852    // Deterministic (and intentionally approximate) accounting of overlay memory.
853    // This is used for budget enforcement/observability; it does not attempt to reflect
854    // the allocator's exact overhead.
855    estimated_bytes: usize,
856}
857
858impl Overlay {
859    // Deterministic estimate per entry to keep budget enforcement stable across platforms.
860    // Includes key + map/node overhead (approx) and value payload bytes.
861    const ENTRY_BASE_BYTES: usize = 32;
862
863    pub fn new() -> Self {
864        Self {
865            map: HashMap::new(),
866            estimated_bytes: 0,
867        }
868    }
869    #[inline]
870    pub fn get(&self, off: usize) -> Option<&OverlayValue> {
871        self.map.get(&off)
872    }
873    #[inline]
874    pub fn set(&mut self, off: usize, v: OverlayValue) -> isize {
875        let new_est = Self::ENTRY_BASE_BYTES + v.estimated_payload_bytes();
876        let old_est = self
877            .map
878            .get(&off)
879            .map(|old| Self::ENTRY_BASE_BYTES + old.estimated_payload_bytes())
880            .unwrap_or(0);
881        self.map.insert(off, v);
882        let delta = new_est as isize - old_est as isize;
883        if delta >= 0 {
884            self.estimated_bytes = self.estimated_bytes.saturating_add(delta as usize);
885        } else {
886            self.estimated_bytes = self.estimated_bytes.saturating_sub((-delta) as usize);
887        }
888        delta
889    }
890
891    #[inline]
892    pub fn remove(&mut self, off: usize) -> isize {
893        let Some(old) = self.map.remove(&off) else {
894            return 0;
895        };
896        let old_est = Self::ENTRY_BASE_BYTES + old.estimated_payload_bytes();
897        self.estimated_bytes = self.estimated_bytes.saturating_sub(old_est);
898        -(old_est as isize)
899    }
900    #[inline]
901    pub fn clear(&mut self) -> usize {
902        let freed = self.estimated_bytes;
903        self.map.clear();
904        self.estimated_bytes = 0;
905        freed
906    }
907    #[inline]
908    pub fn len(&self) -> usize {
909        self.map.len()
910    }
911
912    #[inline]
913    pub fn estimated_bytes(&self) -> usize {
914        self.estimated_bytes
915    }
916    #[inline]
917    pub fn is_empty(&self) -> bool {
918        self.map.is_empty()
919    }
920    #[inline]
921    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
922        self.map.keys().any(|k| range.contains(k))
923    }
924
925    /// Iterate over all `(offset, value)` pairs in the overlay.
926    pub fn iter(&self) -> impl Iterator<Item = (&usize, &OverlayValue)> {
927        self.map.iter()
928    }
929}
930
931impl ArrowSheet {
932    /// Return a summary of each column's chunk counts, total rows, and lane presence.
933    pub fn shape(&self) -> Vec<ColumnShape> {
934        self.columns
935            .iter()
936            .map(|c| {
937                let chunks = c.chunks.len();
938                let rows = self.nrows as usize;
939                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
940                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
941                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
942                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
943                ColumnShape {
944                    index: c.index,
945                    chunks,
946                    rows,
947                    has_num,
948                    has_bool,
949                    has_text,
950                    has_err,
951                }
952            })
953            .collect()
954    }
955
956    pub fn range_view(
957        &self,
958        sr: usize,
959        sc: usize,
960        er: usize,
961        ec: usize,
962    ) -> crate::engine::range_view::RangeView<'_> {
963        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
964        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
965        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
966        crate::engine::range_view::RangeView::new(
967            crate::engine::range_view::RangeBacking::Borrowed(self),
968            sr,
969            sc,
970            er,
971            ec,
972            rows,
973            cols,
974        )
975    }
976
977    /// Fast single-cell read (0-based row/col) with overlay precedence.
978    ///
979    /// This avoids constructing a 1x1 RangeView and is intended for tight read loops.
980    #[inline]
981    pub fn get_cell_value(&self, abs_row: usize, abs_col: usize) -> LiteralValue {
982        let sheet_rows = self.nrows as usize;
983        if abs_row >= sheet_rows {
984            return LiteralValue::Empty;
985        }
986        if abs_col >= self.columns.len() {
987            return LiteralValue::Empty;
988        }
989        let Some((ch_idx, in_off)) = self.chunk_of_row(abs_row) else {
990            return LiteralValue::Empty;
991        };
992        let col_ref = &self.columns[abs_col];
993        let Some(ch) = col_ref.chunk(ch_idx) else {
994            return LiteralValue::Empty;
995        };
996
997        // Overlay takes precedence: user edits over computed over base.
998        if let Some(ov) = ch
999            .overlay
1000            .get(in_off)
1001            .or_else(|| ch.computed_overlay.get(in_off))
1002        {
1003            return match ov {
1004                OverlayValue::Empty => LiteralValue::Empty,
1005                OverlayValue::Number(n) => LiteralValue::Number(*n),
1006                OverlayValue::DateTime(serial) => LiteralValue::from_serial_number(*serial),
1007                OverlayValue::Duration(serial) => {
1008                    let nanos_f = *serial * 86_400.0 * 1_000_000_000.0;
1009                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
1010                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
1011                }
1012                OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
1013                OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
1014                OverlayValue::Error(code) => {
1015                    let kind = unmap_error_code(*code);
1016                    LiteralValue::Error(ExcelError::new(kind))
1017                }
1018                OverlayValue::Pending => LiteralValue::Pending,
1019            };
1020        }
1021
1022        // Read tag and route to lane.
1023        let tag_u8 = ch.type_tag.value(in_off);
1024        match TypeTag::from_u8(tag_u8) {
1025            TypeTag::Empty => LiteralValue::Empty,
1026            TypeTag::Number => {
1027                if let Some(arr) = &ch.numbers {
1028                    if arr.is_null(in_off) {
1029                        return LiteralValue::Empty;
1030                    }
1031                    LiteralValue::Number(arr.value(in_off))
1032                } else {
1033                    LiteralValue::Empty
1034                }
1035            }
1036            TypeTag::DateTime => {
1037                if let Some(arr) = &ch.numbers {
1038                    if arr.is_null(in_off) {
1039                        return LiteralValue::Empty;
1040                    }
1041                    LiteralValue::from_serial_number(arr.value(in_off))
1042                } else {
1043                    LiteralValue::Empty
1044                }
1045            }
1046            TypeTag::Duration => {
1047                if let Some(arr) = &ch.numbers {
1048                    if arr.is_null(in_off) {
1049                        return LiteralValue::Empty;
1050                    }
1051                    let serial = arr.value(in_off);
1052                    let nanos_f = serial * 86_400.0 * 1_000_000_000.0;
1053                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
1054                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
1055                } else {
1056                    LiteralValue::Empty
1057                }
1058            }
1059            TypeTag::Boolean => {
1060                if let Some(arr) = &ch.booleans {
1061                    if arr.is_null(in_off) {
1062                        return LiteralValue::Empty;
1063                    }
1064                    LiteralValue::Boolean(arr.value(in_off))
1065                } else {
1066                    LiteralValue::Empty
1067                }
1068            }
1069            TypeTag::Text => {
1070                if let Some(arr) = &ch.text {
1071                    if arr.is_null(in_off) {
1072                        return LiteralValue::Empty;
1073                    }
1074                    let sa = arr
1075                        .as_any()
1076                        .downcast_ref::<arrow_array::StringArray>()
1077                        .unwrap();
1078                    LiteralValue::Text(sa.value(in_off).to_string())
1079                } else {
1080                    LiteralValue::Empty
1081                }
1082            }
1083            TypeTag::Error => {
1084                if let Some(arr) = &ch.errors {
1085                    if arr.is_null(in_off) {
1086                        return LiteralValue::Empty;
1087                    }
1088                    let kind = unmap_error_code(arr.value(in_off));
1089                    LiteralValue::Error(ExcelError::new(kind))
1090                } else {
1091                    LiteralValue::Empty
1092                }
1093            }
1094            TypeTag::Pending => LiteralValue::Pending,
1095        }
1096    }
1097
1098    /// Ensure capacity to address at least `target_rows` rows by extending the row chunk map.
1099    ///
1100    /// This updates `chunk_starts`/`nrows` but does **not** eagerly densify all columns with
1101    /// new empty chunks. Missing chunks are treated as all-empty and can be materialized lazily.
1102    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
1103        if target_rows as u32 <= self.nrows {
1104            return;
1105        }
1106
1107        let chunk_size = self.chunk_rows.max(1);
1108
1109        // `chunk_starts` must represent fixed-size chunk boundaries based on `chunk_rows`, not
1110        // incremental growth steps. In particular, repeated calls like ensure_row_capacity(1),
1111        // ensure_row_capacity(2), ... must NOT create a new chunk per row.
1112        if self.chunk_starts.is_empty() {
1113            self.chunk_starts.push(0);
1114        }
1115
1116        // Extend chunk starts only when `target_rows` crosses a chunk boundary.
1117        // Example: chunk_size=3, target_rows=6 => chunk_starts=[0,3]
1118        let mut next_start = self
1119            .chunk_starts
1120            .last()
1121            .copied()
1122            .unwrap_or(0)
1123            .saturating_add(chunk_size);
1124        while next_start < target_rows {
1125            self.chunk_starts.push(next_start);
1126            next_start = next_start.saturating_add(chunk_size);
1127        }
1128
1129        self.nrows = target_rows as u32;
1130
1131        // Any previously-materialized chunk may have been created when the sheet had fewer rows.
1132        // When `chunk_starts` extends, chunks that used to be "last" can become interior chunks
1133        // with a larger fixed boundary. Ensure materialized chunks are grown to their current
1134        // boundary-derived length so RangeView slicing stays in-bounds.
1135        let starts = self.chunk_starts.clone();
1136        let nrows = self.nrows as usize;
1137        let required_len_for = |ch_idx: usize| -> Option<usize> {
1138            let start = *starts.get(ch_idx)?;
1139            let end = starts.get(ch_idx + 1).copied().unwrap_or(nrows);
1140            Some(end.saturating_sub(start))
1141        };
1142
1143        for col in &mut self.columns {
1144            for (idx, ch) in col.chunks.iter_mut().enumerate() {
1145                if let Some(req) = required_len_for(idx) {
1146                    ch.grow_len_to(req);
1147                }
1148            }
1149            if !col.sparse_chunks.is_empty() {
1150                let keys: Vec<usize> = col.sparse_chunks.keys().copied().collect();
1151                for idx in keys {
1152                    if let (Some(req), Some(ch)) =
1153                        (required_len_for(idx), col.sparse_chunks.get_mut(&idx))
1154                    {
1155                        ch.grow_len_to(req);
1156                    }
1157                }
1158            }
1159        }
1160    }
1161
1162    /// Ensure a mutable chunk for a given column/chunk index.
1163    ///
1164    /// If the chunk is beyond the column's dense chunk vector, it is stored in `sparse_chunks`.
1165    pub fn ensure_column_chunk_mut(
1166        &mut self,
1167        col_idx: usize,
1168        ch_idx: usize,
1169    ) -> Option<&mut ColumnChunk> {
1170        let start = *self.chunk_starts.get(ch_idx)?;
1171        let end = self
1172            .chunk_starts
1173            .get(ch_idx + 1)
1174            .copied()
1175            .unwrap_or(self.nrows as usize);
1176        let len = end.saturating_sub(start);
1177
1178        let col = self.columns.get_mut(col_idx)?;
1179        if ch_idx < col.chunks.len() {
1180            return Some(&mut col.chunks[ch_idx]);
1181        }
1182        Some(
1183            col.sparse_chunks
1184                .entry(ch_idx)
1185                .or_insert_with(|| Self::make_empty_chunk(len)),
1186        )
1187    }
1188
1189    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
1190    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
1191        if abs_row >= self.nrows as usize {
1192            return None;
1193        }
1194        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
1195            Ok(i) => i,
1196            Err(0) => 0,
1197            Err(i) => i - 1,
1198        };
1199        let start = self.chunk_starts[ch_idx];
1200        Some((ch_idx, abs_row - start))
1201    }
1202
1203    fn recompute_chunk_starts(&mut self) {
1204        self.chunk_starts.clear();
1205        if let Some(col0) = self.columns.first() {
1206            let mut cur = 0usize;
1207            for ch in &col0.chunks {
1208                self.chunk_starts.push(cur);
1209                cur += ch.type_tag.len();
1210            }
1211        }
1212    }
1213
1214    fn make_empty_chunk(len: usize) -> ColumnChunk {
1215        ColumnChunk {
1216            numbers: None,
1217            booleans: None,
1218            text: None,
1219            errors: None,
1220            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
1221            formula_id: None,
1222            meta: ColumnChunkMeta {
1223                len,
1224                non_null_num: 0,
1225                non_null_bool: 0,
1226                non_null_text: 0,
1227                non_null_err: 0,
1228            },
1229            lazy_null_numbers: OnceCell::new(),
1230            lazy_null_booleans: OnceCell::new(),
1231            lazy_null_text: OnceCell::new(),
1232            lazy_null_errors: OnceCell::new(),
1233            lowered_text: OnceCell::new(),
1234            overlay: Overlay::new(),
1235            computed_overlay: Overlay::new(),
1236        }
1237    }
1238
1239    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
1240        // Slice type tags
1241        use arrow_array::Array;
1242        let type_tag: Arc<UInt8Array> = Arc::new(
1243            Array::slice(ch.type_tag.as_ref(), off, len)
1244                .as_any()
1245                .downcast_ref::<UInt8Array>()
1246                .unwrap()
1247                .clone(),
1248        );
1249        // Slice numbers if present and keep only if any non-null
1250        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
1251            let sl = Array::slice(a.as_ref(), off, len);
1252            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
1253            let nn = len.saturating_sub(fa.null_count());
1254            if nn == 0 { None } else { Some(Arc::new(fa)) }
1255        });
1256        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
1257            let sl = Array::slice(a.as_ref(), off, len);
1258            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
1259            let nn = len.saturating_sub(ba.null_count());
1260            if nn == 0 { None } else { Some(Arc::new(ba)) }
1261        });
1262        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
1263            let sl = Array::slice(a.as_ref(), off, len);
1264            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
1265            let nn = len.saturating_sub(sa.null_count());
1266            if nn == 0 {
1267                None
1268            } else {
1269                Some(Arc::new(sa) as ArrayRef)
1270            }
1271        });
1272        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
1273            let sl = Array::slice(a.as_ref(), off, len);
1274            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
1275            let nn = len.saturating_sub(ea.null_count());
1276            if nn == 0 { None } else { Some(Arc::new(ea)) }
1277        });
1278        // Split overlays for this slice
1279        let mut overlay = Overlay::new();
1280        for (k, v) in ch.overlay.map.iter() {
1281            if *k >= off && *k < off + len {
1282                let _ = overlay.set(*k - off, v.clone());
1283            }
1284        }
1285        let mut computed_overlay = Overlay::new();
1286        for (k, v) in ch.computed_overlay.map.iter() {
1287            if *k >= off && *k < off + len {
1288                let _ = computed_overlay.set(*k - off, v.clone());
1289            }
1290        }
1291        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
1292        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
1293        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
1294        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
1295        ColumnChunk {
1296            numbers: numbers.clone(),
1297            booleans: booleans.clone(),
1298            text: text.clone(),
1299            errors: errors.clone(),
1300            type_tag,
1301            formula_id: None,
1302            meta: ColumnChunkMeta {
1303                len,
1304                non_null_num,
1305                non_null_bool,
1306                non_null_text,
1307                non_null_err,
1308            },
1309            lazy_null_numbers: OnceCell::new(),
1310            lazy_null_booleans: OnceCell::new(),
1311            lazy_null_text: OnceCell::new(),
1312            lazy_null_errors: OnceCell::new(),
1313            lowered_text: OnceCell::new(),
1314            overlay,
1315            computed_overlay,
1316        }
1317    }
1318
1319    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
1320    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
1321    pub fn maybe_compact_chunk(
1322        &mut self,
1323        col_idx: usize,
1324        ch_idx: usize,
1325        abs_threshold: usize,
1326        frac_den: usize,
1327    ) -> usize {
1328        if col_idx >= self.columns.len() {
1329            return 0;
1330        }
1331
1332        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
1333            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
1334                return 0;
1335            };
1336            let len = ch_ref.type_tag.len();
1337            if len == 0 {
1338                return 0;
1339            }
1340
1341            let ov_len = ch_ref.overlay.len();
1342            let den = frac_den.max(1);
1343            let trig = ov_len > (len / den) || ov_len > abs_threshold;
1344            if !trig {
1345                return 0;
1346            }
1347
1348            // Rebuild: merge base lanes with overlays row-by-row.
1349            let mut tag_b = UInt8Builder::with_capacity(len);
1350            let mut nb = Float64Builder::with_capacity(len);
1351            let mut bb = BooleanBuilder::with_capacity(len);
1352            let mut sb = StringBuilder::with_capacity(len, len * 8);
1353            let mut eb = UInt8Builder::with_capacity(len);
1354            let mut non_num = 0usize;
1355            let mut non_bool = 0usize;
1356            let mut non_text = 0usize;
1357            let mut non_err = 0usize;
1358
1359            for i in 0..len {
1360                // If overlay present, use it. Otherwise, use base tag+lane.
1361                if let Some(ov) = ch_ref.overlay.get(i) {
1362                    match ov {
1363                        OverlayValue::Empty => {
1364                            tag_b.append_value(TypeTag::Empty as u8);
1365                            nb.append_null();
1366                            bb.append_null();
1367                            sb.append_null();
1368                            eb.append_null();
1369                        }
1370                        OverlayValue::Number(n) => {
1371                            tag_b.append_value(TypeTag::Number as u8);
1372                            nb.append_value(*n);
1373                            non_num += 1;
1374                            bb.append_null();
1375                            sb.append_null();
1376                            eb.append_null();
1377                        }
1378                        OverlayValue::DateTime(serial) => {
1379                            tag_b.append_value(TypeTag::DateTime as u8);
1380                            nb.append_value(*serial);
1381                            non_num += 1;
1382                            bb.append_null();
1383                            sb.append_null();
1384                            eb.append_null();
1385                        }
1386                        OverlayValue::Duration(serial) => {
1387                            tag_b.append_value(TypeTag::Duration as u8);
1388                            nb.append_value(*serial);
1389                            non_num += 1;
1390                            bb.append_null();
1391                            sb.append_null();
1392                            eb.append_null();
1393                        }
1394                        OverlayValue::Boolean(b) => {
1395                            tag_b.append_value(TypeTag::Boolean as u8);
1396                            nb.append_null();
1397                            bb.append_value(*b);
1398                            non_bool += 1;
1399                            sb.append_null();
1400                            eb.append_null();
1401                        }
1402                        OverlayValue::Text(s) => {
1403                            tag_b.append_value(TypeTag::Text as u8);
1404                            nb.append_null();
1405                            bb.append_null();
1406                            sb.append_value(s);
1407                            non_text += 1;
1408                            eb.append_null();
1409                        }
1410                        OverlayValue::Error(code) => {
1411                            tag_b.append_value(TypeTag::Error as u8);
1412                            nb.append_null();
1413                            bb.append_null();
1414                            sb.append_null();
1415                            eb.append_value(*code);
1416                            non_err += 1;
1417                        }
1418                        OverlayValue::Pending => {
1419                            tag_b.append_value(TypeTag::Pending as u8);
1420                            nb.append_null();
1421                            bb.append_null();
1422                            sb.append_null();
1423                            eb.append_null();
1424                        }
1425                    }
1426                } else {
1427                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
1428                    match tag {
1429                        TypeTag::Empty => {
1430                            tag_b.append_value(TypeTag::Empty as u8);
1431                            nb.append_null();
1432                            bb.append_null();
1433                            sb.append_null();
1434                            eb.append_null();
1435                        }
1436                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1437                            tag_b.append_value(tag as u8);
1438                            if let Some(a) = &ch_ref.numbers {
1439                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
1440                                if fa.is_null(i) {
1441                                    nb.append_null();
1442                                } else {
1443                                    nb.append_value(fa.value(i));
1444                                    non_num += 1;
1445                                }
1446                            } else {
1447                                nb.append_null();
1448                            }
1449                            bb.append_null();
1450                            sb.append_null();
1451                            eb.append_null();
1452                        }
1453                        TypeTag::Boolean => {
1454                            tag_b.append_value(TypeTag::Boolean as u8);
1455                            nb.append_null();
1456                            if let Some(a) = &ch_ref.booleans {
1457                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
1458                                if ba.is_null(i) {
1459                                    bb.append_null();
1460                                } else {
1461                                    bb.append_value(ba.value(i));
1462                                    non_bool += 1;
1463                                }
1464                            } else {
1465                                bb.append_null();
1466                            }
1467                            sb.append_null();
1468                            eb.append_null();
1469                        }
1470                        TypeTag::Text => {
1471                            tag_b.append_value(TypeTag::Text as u8);
1472                            nb.append_null();
1473                            bb.append_null();
1474                            if let Some(a) = &ch_ref.text {
1475                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
1476                                if sa.is_null(i) {
1477                                    sb.append_null();
1478                                } else {
1479                                    sb.append_value(sa.value(i));
1480                                    non_text += 1;
1481                                }
1482                            } else {
1483                                sb.append_null();
1484                            }
1485                            eb.append_null();
1486                        }
1487                        TypeTag::Error => {
1488                            tag_b.append_value(TypeTag::Error as u8);
1489                            nb.append_null();
1490                            bb.append_null();
1491                            sb.append_null();
1492                            if let Some(a) = &ch_ref.errors {
1493                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
1494                                if ea.is_null(i) {
1495                                    eb.append_null();
1496                                } else {
1497                                    eb.append_value(ea.value(i));
1498                                    non_err += 1;
1499                                }
1500                            } else {
1501                                eb.append_null();
1502                            }
1503                        }
1504                        TypeTag::Pending => {
1505                            tag_b.append_value(TypeTag::Pending as u8);
1506                            nb.append_null();
1507                            bb.append_null();
1508                            sb.append_null();
1509                            eb.append_null();
1510                        }
1511                    }
1512                }
1513            }
1514
1515            let tags = Arc::new(tag_b.finish());
1516            let numbers = {
1517                let a = nb.finish();
1518                if non_num == 0 {
1519                    None
1520                } else {
1521                    Some(Arc::new(a))
1522                }
1523            };
1524            let booleans = {
1525                let a = bb.finish();
1526                if non_bool == 0 {
1527                    None
1528                } else {
1529                    Some(Arc::new(a))
1530                }
1531            };
1532            let text = {
1533                let a = sb.finish();
1534                if non_text == 0 {
1535                    None
1536                } else {
1537                    Some(Arc::new(a) as ArrayRef)
1538                }
1539            };
1540            let errors = {
1541                let a = eb.finish();
1542                if non_err == 0 {
1543                    None
1544                } else {
1545                    Some(Arc::new(a))
1546                }
1547            };
1548
1549            (
1550                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
1551            )
1552        };
1553
1554        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
1555            return 0;
1556        };
1557
1558        ch_mut.type_tag = tags;
1559        ch_mut.numbers = numbers;
1560        ch_mut.booleans = booleans;
1561        ch_mut.text = text;
1562        ch_mut.errors = errors;
1563        let freed = ch_mut.overlay.clear();
1564        ch_mut.lowered_text = OnceCell::new();
1565        ch_mut.meta.len = len;
1566        ch_mut.meta.non_null_num = non_num;
1567        ch_mut.meta.non_null_bool = non_bool;
1568        ch_mut.meta.non_null_text = non_text;
1569        ch_mut.meta.non_null_err = non_err;
1570        freed
1571    }
1572
1573    /// Compact a dense chunk's computed overlay into its base arrays, freeing overlay memory
1574    /// while preserving the data. Returns the number of bytes freed.
1575    ///
1576    /// This is the computed-overlay counterpart of `maybe_compact_chunk` (which compacts
1577    /// user-edit overlays). The read cascade is `overlay → computed_overlay → base`, so
1578    /// folding computed overlay entries into base arrays is transparent: the `overlay` layer
1579    /// (user edits) is left untouched and still takes precedence on reads.
1580    pub fn compact_computed_overlay_chunk(&mut self, col_idx: usize, ch_idx: usize) -> usize {
1581        if col_idx >= self.columns.len() {
1582            return 0;
1583        }
1584
1585        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
1586            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
1587                return 0;
1588            };
1589            let len = ch_ref.type_tag.len();
1590            if len == 0 || ch_ref.computed_overlay.is_empty() {
1591                return 0;
1592            }
1593
1594            let mut tag_b = UInt8Builder::with_capacity(len);
1595            let mut nb = Float64Builder::with_capacity(len);
1596            let mut bb = BooleanBuilder::with_capacity(len);
1597            let mut sb = StringBuilder::with_capacity(len, len * 8);
1598            let mut eb = UInt8Builder::with_capacity(len);
1599            let mut non_num = 0usize;
1600            let mut non_bool = 0usize;
1601            let mut non_text = 0usize;
1602            let mut non_err = 0usize;
1603
1604            for i in 0..len {
1605                if let Some(ov) = ch_ref.computed_overlay.get(i) {
1606                    match ov {
1607                        OverlayValue::Empty => {
1608                            tag_b.append_value(TypeTag::Empty as u8);
1609                            nb.append_null();
1610                            bb.append_null();
1611                            sb.append_null();
1612                            eb.append_null();
1613                        }
1614                        OverlayValue::Number(n) => {
1615                            tag_b.append_value(TypeTag::Number as u8);
1616                            nb.append_value(*n);
1617                            non_num += 1;
1618                            bb.append_null();
1619                            sb.append_null();
1620                            eb.append_null();
1621                        }
1622                        OverlayValue::DateTime(serial) => {
1623                            tag_b.append_value(TypeTag::DateTime as u8);
1624                            nb.append_value(*serial);
1625                            non_num += 1;
1626                            bb.append_null();
1627                            sb.append_null();
1628                            eb.append_null();
1629                        }
1630                        OverlayValue::Duration(serial) => {
1631                            tag_b.append_value(TypeTag::Duration as u8);
1632                            nb.append_value(*serial);
1633                            non_num += 1;
1634                            bb.append_null();
1635                            sb.append_null();
1636                            eb.append_null();
1637                        }
1638                        OverlayValue::Boolean(b) => {
1639                            tag_b.append_value(TypeTag::Boolean as u8);
1640                            nb.append_null();
1641                            bb.append_value(*b);
1642                            non_bool += 1;
1643                            sb.append_null();
1644                            eb.append_null();
1645                        }
1646                        OverlayValue::Text(s) => {
1647                            tag_b.append_value(TypeTag::Text as u8);
1648                            nb.append_null();
1649                            bb.append_null();
1650                            sb.append_value(s);
1651                            non_text += 1;
1652                            eb.append_null();
1653                        }
1654                        OverlayValue::Error(code) => {
1655                            tag_b.append_value(TypeTag::Error as u8);
1656                            nb.append_null();
1657                            bb.append_null();
1658                            sb.append_null();
1659                            eb.append_value(*code);
1660                            non_err += 1;
1661                        }
1662                        OverlayValue::Pending => {
1663                            tag_b.append_value(TypeTag::Pending as u8);
1664                            nb.append_null();
1665                            bb.append_null();
1666                            sb.append_null();
1667                            eb.append_null();
1668                        }
1669                    }
1670                } else {
1671                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
1672                    match tag {
1673                        TypeTag::Empty => {
1674                            tag_b.append_value(TypeTag::Empty as u8);
1675                            nb.append_null();
1676                            bb.append_null();
1677                            sb.append_null();
1678                            eb.append_null();
1679                        }
1680                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1681                            tag_b.append_value(tag as u8);
1682                            if let Some(a) = &ch_ref.numbers {
1683                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
1684                                if fa.is_null(i) {
1685                                    nb.append_null();
1686                                } else {
1687                                    nb.append_value(fa.value(i));
1688                                    non_num += 1;
1689                                }
1690                            } else {
1691                                nb.append_null();
1692                            }
1693                            bb.append_null();
1694                            sb.append_null();
1695                            eb.append_null();
1696                        }
1697                        TypeTag::Boolean => {
1698                            tag_b.append_value(TypeTag::Boolean as u8);
1699                            nb.append_null();
1700                            if let Some(a) = &ch_ref.booleans {
1701                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
1702                                if ba.is_null(i) {
1703                                    bb.append_null();
1704                                } else {
1705                                    bb.append_value(ba.value(i));
1706                                    non_bool += 1;
1707                                }
1708                            } else {
1709                                bb.append_null();
1710                            }
1711                            sb.append_null();
1712                            eb.append_null();
1713                        }
1714                        TypeTag::Text => {
1715                            tag_b.append_value(TypeTag::Text as u8);
1716                            nb.append_null();
1717                            bb.append_null();
1718                            if let Some(a) = &ch_ref.text {
1719                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
1720                                if sa.is_null(i) {
1721                                    sb.append_null();
1722                                } else {
1723                                    sb.append_value(sa.value(i));
1724                                    non_text += 1;
1725                                }
1726                            } else {
1727                                sb.append_null();
1728                            }
1729                            eb.append_null();
1730                        }
1731                        TypeTag::Error => {
1732                            tag_b.append_value(TypeTag::Error as u8);
1733                            nb.append_null();
1734                            bb.append_null();
1735                            sb.append_null();
1736                            if let Some(a) = &ch_ref.errors {
1737                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
1738                                if ea.is_null(i) {
1739                                    eb.append_null();
1740                                } else {
1741                                    eb.append_value(ea.value(i));
1742                                    non_err += 1;
1743                                }
1744                            } else {
1745                                eb.append_null();
1746                            }
1747                        }
1748                        TypeTag::Pending => {
1749                            tag_b.append_value(TypeTag::Pending as u8);
1750                            nb.append_null();
1751                            bb.append_null();
1752                            sb.append_null();
1753                            eb.append_null();
1754                        }
1755                    }
1756                }
1757            }
1758
1759            let tags = Arc::new(tag_b.finish());
1760            let numbers = {
1761                let a = nb.finish();
1762                if non_num == 0 {
1763                    None
1764                } else {
1765                    Some(Arc::new(a))
1766                }
1767            };
1768            let booleans = {
1769                let a = bb.finish();
1770                if non_bool == 0 {
1771                    None
1772                } else {
1773                    Some(Arc::new(a))
1774                }
1775            };
1776            let text = {
1777                let a = sb.finish();
1778                if non_text == 0 {
1779                    None
1780                } else {
1781                    Some(Arc::new(a) as ArrayRef)
1782                }
1783            };
1784            let errors = {
1785                let a = eb.finish();
1786                if non_err == 0 {
1787                    None
1788                } else {
1789                    Some(Arc::new(a))
1790                }
1791            };
1792
1793            (
1794                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
1795            )
1796        };
1797
1798        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
1799            return 0;
1800        };
1801
1802        ch_mut.type_tag = tags;
1803        ch_mut.numbers = numbers;
1804        ch_mut.booleans = booleans;
1805        ch_mut.text = text;
1806        ch_mut.errors = errors;
1807        let freed = ch_mut.computed_overlay.clear();
1808        ch_mut.lowered_text = OnceCell::new();
1809        ch_mut.meta.len = len;
1810        ch_mut.meta.non_null_num = non_num;
1811        ch_mut.meta.non_null_bool = non_bool;
1812        ch_mut.meta.non_null_text = non_text;
1813        ch_mut.meta.non_null_err = non_err;
1814        freed
1815    }
1816
1817    /// Compact a sparse chunk's computed overlay into its base arrays.
1818    /// Equivalent to `compact_computed_overlay_chunk` but for sparse chunks.
1819    pub fn compact_computed_overlay_sparse_chunk(
1820        &mut self,
1821        col_idx: usize,
1822        ch_idx: usize,
1823    ) -> usize {
1824        // Sparse chunks are accessed via the same chunk/chunk_mut API,
1825        // so we delegate to the dense method which already handles both.
1826        self.compact_computed_overlay_chunk(col_idx, ch_idx)
1827    }
1828
1829    /// Insert `count` rows before absolute 0-based row `before`.
1830    pub fn insert_rows(&mut self, before: usize, count: usize) {
1831        if count == 0 {
1832            return;
1833        }
1834
1835        let total_rows = self.nrows as usize;
1836        if total_rows == 0 {
1837            self.nrows = count as u32;
1838            if self.nrows > 0 && self.chunk_starts.is_empty() {
1839                self.chunk_starts.push(0);
1840            }
1841            return;
1842        }
1843
1844        // Ensure a valid chunk map for non-empty sheets.
1845        if self.chunk_starts.is_empty() {
1846            self.chunk_starts.push(0);
1847        }
1848
1849        // "Dense" mode: every column has every chunk (legacy invariant).
1850        let dense_aligned = self
1851            .columns
1852            .iter()
1853            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
1854
1855        let insert_at = before.min(total_rows);
1856        let (split_idx, split_off) = if insert_at == total_rows {
1857            // Append at end: split after last chunk.
1858            let last_idx = self.chunk_starts.len() - 1;
1859            let last_start = self.chunk_starts[last_idx];
1860            let last_len = total_rows.saturating_sub(last_start);
1861            (last_idx, last_len)
1862        } else {
1863            self.chunk_of_row(insert_at).unwrap_or((0, 0))
1864        };
1865
1866        if dense_aligned {
1867            // Rebuild chunks for each column (including inserted empty chunk) and recompute starts.
1868            for col in &mut self.columns {
1869                let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 2);
1870                for i in 0..col.chunks.len() {
1871                    if i != split_idx {
1872                        new_chunks.push(col.chunks[i].clone());
1873                    } else {
1874                        let orig = &col.chunks[i];
1875                        let len = orig.type_tag.len();
1876                        if split_off > 0 {
1877                            new_chunks.push(Self::slice_chunk(orig, 0, split_off));
1878                        }
1879                        new_chunks.push(Self::make_empty_chunk(count));
1880                        if split_off < len {
1881                            new_chunks.push(Self::slice_chunk(orig, split_off, len - split_off));
1882                        }
1883                    }
1884                }
1885                col.chunks = new_chunks;
1886                col.sparse_chunks.clear();
1887            }
1888            self.nrows = (total_rows + count) as u32;
1889            self.recompute_chunk_starts();
1890            return;
1891        }
1892
1893        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
1894        #[derive(Clone, Copy)]
1895        enum PlanItem {
1896            Slice {
1897                old_idx: usize,
1898                off: usize,
1899                len: usize,
1900            },
1901            Empty {
1902                len: usize,
1903            },
1904        }
1905
1906        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len() + 2);
1907        for old_idx in 0..self.chunk_starts.len() {
1908            let ch_start = self.chunk_starts[old_idx];
1909            let ch_end = self
1910                .chunk_starts
1911                .get(old_idx + 1)
1912                .copied()
1913                .unwrap_or(total_rows);
1914            let ch_len = ch_end.saturating_sub(ch_start);
1915            if ch_len == 0 {
1916                continue;
1917            }
1918
1919            if old_idx != split_idx {
1920                plan.push(PlanItem::Slice {
1921                    old_idx,
1922                    off: 0,
1923                    len: ch_len,
1924                });
1925                continue;
1926            }
1927
1928            let left_len = split_off.min(ch_len);
1929            let right_len = ch_len.saturating_sub(left_len);
1930            if left_len > 0 {
1931                plan.push(PlanItem::Slice {
1932                    old_idx,
1933                    off: 0,
1934                    len: left_len,
1935                });
1936            }
1937            plan.push(PlanItem::Empty { len: count });
1938            if right_len > 0 {
1939                plan.push(PlanItem::Slice {
1940                    old_idx,
1941                    off: left_len,
1942                    len: right_len,
1943                });
1944            }
1945        }
1946
1947        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
1948        let mut cur = 0usize;
1949        for item in &plan {
1950            let len = match *item {
1951                PlanItem::Slice { len, .. } => len,
1952                PlanItem::Empty { len } => len,
1953            };
1954            if len == 0 {
1955                continue;
1956            }
1957            new_starts.push(cur);
1958            cur = cur.saturating_add(len);
1959        }
1960
1961        debug_assert_eq!(cur, total_rows.saturating_add(count));
1962
1963        // Update sheet row layout first.
1964        self.nrows = (total_rows + count) as u32;
1965        self.chunk_starts = new_starts;
1966
1967        // Rebuild stored chunks per column using the plan.
1968        for col in &mut self.columns {
1969            let old_dense = std::mem::take(&mut col.chunks);
1970            let old_sparse = std::mem::take(&mut col.sparse_chunks);
1971            let get_old = |idx: usize| -> Option<&ColumnChunk> {
1972                if idx < old_dense.len() {
1973                    Some(&old_dense[idx])
1974                } else {
1975                    old_sparse.get(&idx)
1976                }
1977            };
1978
1979            let mut dense: Vec<ColumnChunk> = Vec::new();
1980            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
1981            let mut dense_prefix = true;
1982
1983            for (new_idx, item) in plan.iter().enumerate() {
1984                let produced: Option<ColumnChunk> = match *item {
1985                    PlanItem::Empty { .. } => None,
1986                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
1987                        Some(orig) => {
1988                            if off == 0 && len == orig.type_tag.len() {
1989                                Some(orig.clone())
1990                            } else {
1991                                Some(Self::slice_chunk(orig, off, len))
1992                            }
1993                        }
1994                        None => None,
1995                    },
1996                };
1997
1998                if let Some(ch) = produced {
1999                    if dense_prefix && new_idx == dense.len() {
2000                        dense.push(ch);
2001                    } else {
2002                        sparse.insert(new_idx, ch);
2003                        dense_prefix = false;
2004                    }
2005                } else if dense_prefix && new_idx == dense.len() {
2006                    dense_prefix = false;
2007                }
2008            }
2009
2010            col.chunks = dense;
2011            col.sparse_chunks = sparse;
2012        }
2013    }
2014
2015    /// Delete `count` rows starting from absolute 0-based row `start`.
2016    pub fn delete_rows(&mut self, start: usize, count: usize) {
2017        if count == 0 || self.nrows == 0 {
2018            return;
2019        }
2020
2021        let total_rows = self.nrows as usize;
2022        if start >= total_rows {
2023            return;
2024        }
2025        let end = (start + count).min(total_rows);
2026        let del_len = end.saturating_sub(start);
2027        if del_len == 0 {
2028            return;
2029        }
2030
2031        // Ensure a valid chunk map for non-empty sheets.
2032        if total_rows > 0 && self.chunk_starts.is_empty() {
2033            self.chunk_starts.push(0);
2034        }
2035
2036        // "Dense" mode: every column has every chunk (legacy invariant).
2037        let dense_aligned = self
2038            .columns
2039            .iter()
2040            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
2041
2042        if dense_aligned {
2043            // Dense rebuild by slicing out the deleted window.
2044            for col in &mut self.columns {
2045                let mut new_chunks: Vec<ColumnChunk> = Vec::new();
2046                let mut cur_start = 0usize;
2047                for ch in &col.chunks {
2048                    let len = ch.type_tag.len();
2049                    let ch_end = cur_start + len;
2050                    // No overlap
2051                    if ch_end <= start || cur_start >= end {
2052                        new_chunks.push(ch.clone());
2053                    } else {
2054                        // Overlap exists
2055                        let del_start = start.max(cur_start);
2056                        let del_end = end.min(ch_end);
2057                        let left_len = del_start.saturating_sub(cur_start);
2058                        let right_len = ch_end.saturating_sub(del_end);
2059                        if left_len > 0 {
2060                            new_chunks.push(Self::slice_chunk(ch, 0, left_len));
2061                        }
2062                        if right_len > 0 {
2063                            let off = len - right_len;
2064                            new_chunks.push(Self::slice_chunk(ch, off, right_len));
2065                        }
2066                    }
2067                    cur_start = ch_end;
2068                }
2069                col.chunks = new_chunks;
2070                col.sparse_chunks.clear();
2071            }
2072            self.nrows = (total_rows - del_len) as u32;
2073            self.recompute_chunk_starts();
2074            return;
2075        }
2076
2077        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
2078        #[derive(Clone, Copy)]
2079        enum PlanItem {
2080            Slice {
2081                old_idx: usize,
2082                off: usize,
2083                len: usize,
2084            },
2085        }
2086
2087        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len());
2088        for old_idx in 0..self.chunk_starts.len() {
2089            let ch_start = self.chunk_starts[old_idx];
2090            let ch_end = self
2091                .chunk_starts
2092                .get(old_idx + 1)
2093                .copied()
2094                .unwrap_or(total_rows);
2095            let ch_len = ch_end.saturating_sub(ch_start);
2096            if ch_len == 0 {
2097                continue;
2098            }
2099
2100            // No overlap
2101            if ch_end <= start || ch_start >= end {
2102                plan.push(PlanItem::Slice {
2103                    old_idx,
2104                    off: 0,
2105                    len: ch_len,
2106                });
2107                continue;
2108            }
2109
2110            // Left remainder
2111            if start > ch_start {
2112                let left_end = start.min(ch_end);
2113                let left_len = left_end.saturating_sub(ch_start);
2114                if left_len > 0 {
2115                    plan.push(PlanItem::Slice {
2116                        old_idx,
2117                        off: 0,
2118                        len: left_len,
2119                    });
2120                }
2121            }
2122
2123            // Right remainder
2124            if end < ch_end {
2125                let right_off = end.saturating_sub(ch_start);
2126                let right_len = ch_end.saturating_sub(end);
2127                if right_len > 0 {
2128                    plan.push(PlanItem::Slice {
2129                        old_idx,
2130                        off: right_off,
2131                        len: right_len,
2132                    });
2133                }
2134            }
2135        }
2136
2137        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
2138        let mut cur = 0usize;
2139        for item in &plan {
2140            let len = match *item {
2141                PlanItem::Slice { len, .. } => len,
2142            };
2143            if len == 0 {
2144                continue;
2145            }
2146            new_starts.push(cur);
2147            cur = cur.saturating_add(len);
2148        }
2149
2150        debug_assert_eq!(cur, total_rows.saturating_sub(del_len));
2151
2152        // Update sheet row layout first.
2153        self.nrows = (total_rows - del_len) as u32;
2154        self.chunk_starts = new_starts;
2155
2156        // Rebuild stored chunks per column using the plan.
2157        for col in &mut self.columns {
2158            let old_dense = std::mem::take(&mut col.chunks);
2159            let old_sparse = std::mem::take(&mut col.sparse_chunks);
2160            let get_old = |idx: usize| -> Option<&ColumnChunk> {
2161                if idx < old_dense.len() {
2162                    Some(&old_dense[idx])
2163                } else {
2164                    old_sparse.get(&idx)
2165                }
2166            };
2167
2168            let mut dense: Vec<ColumnChunk> = Vec::new();
2169            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
2170            let mut dense_prefix = true;
2171
2172            for (new_idx, item) in plan.iter().enumerate() {
2173                let produced: Option<ColumnChunk> = match *item {
2174                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
2175                        Some(orig) => {
2176                            if off == 0 && len == orig.type_tag.len() {
2177                                Some(orig.clone())
2178                            } else {
2179                                Some(Self::slice_chunk(orig, off, len))
2180                            }
2181                        }
2182                        None => None,
2183                    },
2184                };
2185
2186                if let Some(ch) = produced {
2187                    if dense_prefix && new_idx == dense.len() {
2188                        dense.push(ch);
2189                    } else {
2190                        sparse.insert(new_idx, ch);
2191                        dense_prefix = false;
2192                    }
2193                } else if dense_prefix && new_idx == dense.len() {
2194                    dense_prefix = false;
2195                }
2196            }
2197
2198            col.chunks = dense;
2199            col.sparse_chunks = sparse;
2200        }
2201    }
2202
2203    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
2204    pub fn insert_columns(&mut self, before: usize, count: usize) {
2205        if count == 0 {
2206            return;
2207        }
2208        // Determine chunk schema from first column if present
2209        let empty_col = |lens: &[usize]| -> ArrowColumn {
2210            let mut chunks = Vec::with_capacity(lens.len());
2211            for &l in lens {
2212                chunks.push(Self::make_empty_chunk(l));
2213            }
2214            ArrowColumn {
2215                chunks,
2216                sparse_chunks: FxHashMap::default(),
2217                index: 0,
2218            }
2219        };
2220        let dense_aligned = !self.columns.is_empty()
2221            && self
2222                .columns
2223                .iter()
2224                .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
2225
2226        let lens: Vec<usize> = if dense_aligned {
2227            self.columns[0]
2228                .chunks
2229                .iter()
2230                .map(|c| c.type_tag.len())
2231                .collect()
2232        } else if self.columns.is_empty() {
2233            // No columns: single chunk matching nrows if any
2234            if self.nrows > 0 {
2235                vec![self.nrows as usize]
2236            } else {
2237                Vec::new()
2238            }
2239        } else {
2240            // Sparse sheet: keep inserted columns cheap by materializing no chunks.
2241            Vec::new()
2242        };
2243        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
2244        let before_idx = before.min(self.columns.len());
2245        for (i, col) in self.columns.iter_mut().enumerate() {
2246            if i == before_idx {
2247                for _ in 0..count {
2248                    cols_new.push(empty_col(&lens));
2249                }
2250            }
2251            cols_new.push(col.clone());
2252        }
2253        if before_idx == self.columns.len() {
2254            for _ in 0..count {
2255                cols_new.push(empty_col(&lens));
2256            }
2257        }
2258        // Fix column indices
2259        for (idx, col) in cols_new.iter_mut().enumerate() {
2260            col.index = idx as u32;
2261        }
2262        self.columns = cols_new;
2263        // chunk_starts unchanged; lens were matched
2264    }
2265
2266    /// Delete `count` columns starting at absolute 0-based column `start`.
2267    pub fn delete_columns(&mut self, start: usize, count: usize) {
2268        if count == 0 || self.columns.is_empty() {
2269            return;
2270        }
2271        let end = (start + count).min(self.columns.len());
2272        if start >= end {
2273            return;
2274        }
2275        self.columns.drain(start..end);
2276        for (idx, col) in self.columns.iter_mut().enumerate() {
2277            col.index = idx as u32;
2278        }
2279    }
2280}
2281
2282#[derive(Debug, Clone, Copy)]
2283pub struct ColumnShape {
2284    pub index: u32,
2285    pub chunks: usize,
2286    pub rows: usize,
2287    pub has_num: bool,
2288    pub has_bool: bool,
2289    pub has_text: bool,
2290    pub has_err: bool,
2291}
2292
2293#[cfg(test)]
2294mod tests {
2295    use super::*;
2296    use arrow_array::Array;
2297    use arrow_schema::DataType;
2298
2299    #[test]
2300    fn ingest_mixed_rows_into_lanes_and_tags() {
2301        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
2302        let data = vec![
2303            LiteralValue::Number(42.5),                   // Number
2304            LiteralValue::Empty,                          // Empty
2305            LiteralValue::Text(String::new()),            // Empty text (Text lane)
2306            LiteralValue::Boolean(true),                  // Boolean
2307            LiteralValue::Error(ExcelError::new_value()), // Error
2308        ];
2309        for v in &data {
2310            b.append_row(std::slice::from_ref(v)).unwrap();
2311        }
2312        let sheet = b.finish();
2313        assert_eq!(sheet.nrows, 5);
2314        assert_eq!(sheet.columns.len(), 1);
2315        assert_eq!(sheet.columns[0].chunks.len(), 1);
2316        let ch = &sheet.columns[0].chunks[0];
2317
2318        // Type tags
2319        let tags = ch.type_tag.values();
2320        assert_eq!(tags.len(), 5);
2321        assert_eq!(tags[0], TypeTag::Number as u8);
2322        assert_eq!(tags[1], TypeTag::Empty as u8);
2323        assert_eq!(tags[2], TypeTag::Text as u8);
2324        assert_eq!(tags[3], TypeTag::Boolean as u8);
2325        assert_eq!(tags[4], TypeTag::Error as u8);
2326
2327        // Numbers lane validity
2328        let nums = ch.numbers.as_ref().unwrap();
2329        assert_eq!(nums.len(), 5);
2330        assert_eq!(nums.null_count(), 4);
2331        assert!(nums.is_valid(0));
2332
2333        // Booleans lane validity
2334        let bools = ch.booleans.as_ref().unwrap();
2335        assert_eq!(bools.len(), 5);
2336        assert_eq!(bools.null_count(), 4);
2337        assert!(bools.is_valid(3));
2338
2339        // Text lane validity
2340        let txt = ch.text.as_ref().unwrap();
2341        assert_eq!(txt.len(), 5);
2342        assert_eq!(txt.null_count(), 4);
2343        assert!(txt.is_valid(2)); // ""
2344
2345        // Errors lane
2346        let errs = ch.errors.as_ref().unwrap();
2347        assert_eq!(errs.len(), 5);
2348        assert_eq!(errs.null_count(), 4);
2349        assert!(errs.is_valid(4));
2350    }
2351
2352    #[test]
2353    fn range_view_get_cell_and_padding() {
2354        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2355        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
2356            .unwrap();
2357        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
2358            .unwrap();
2359        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
2360            .unwrap();
2361        let sheet = b.finish();
2362        let rv = sheet.range_view(0, 0, 2, 1);
2363        assert_eq!(rv.dims(), (3, 2));
2364        // Inside
2365        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
2366        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
2367        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
2368        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
2369        // OOB padding
2370        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
2371        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
2372
2373        // Numbers slices should produce one 2-row and one 1-row segment
2374        let nums: Vec<_> = rv.numbers_slices().map(|r| r.unwrap()).collect();
2375        assert_eq!(nums.len(), 2);
2376        assert_eq!(nums[0].0, 0);
2377        assert_eq!(nums[0].1, 2);
2378        assert_eq!(nums[1].0, 2);
2379        assert_eq!(nums[1].1, 1);
2380    }
2381
2382    #[test]
2383    fn overlay_precedence_user_over_computed() {
2384        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
2385        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
2386        b.append_row(&[LiteralValue::Empty]).unwrap();
2387        b.append_row(&[LiteralValue::Empty]).unwrap();
2388        let mut sheet = b.finish();
2389
2390        let (ch_i, off) = sheet.chunk_of_row(0).unwrap();
2391        sheet.columns[0].chunks[ch_i]
2392            .computed_overlay
2393            .set(off, OverlayValue::Number(2.0));
2394
2395        let rv0 = sheet.range_view(0, 0, 0, 0);
2396        assert_eq!(rv0.get_cell(0, 0), LiteralValue::Number(2.0));
2397        let nums0: Vec<_> = rv0.numbers_slices().map(|r| r.unwrap()).collect();
2398        assert_eq!(nums0.len(), 1);
2399        assert_eq!(nums0[0].2[0].value(0), 2.0);
2400
2401        sheet.columns[0].chunks[ch_i]
2402            .overlay
2403            .set(off, OverlayValue::Number(3.0));
2404
2405        let rv1 = sheet.range_view(0, 0, 0, 0);
2406        assert_eq!(rv1.get_cell(0, 0), LiteralValue::Number(3.0));
2407        let nums1: Vec<_> = rv1.numbers_slices().map(|r| r.unwrap()).collect();
2408        assert_eq!(nums1.len(), 1);
2409        assert_eq!(nums1[0].2[0].value(0), 3.0);
2410    }
2411
2412    #[test]
2413    fn row_chunk_slices_shape() {
2414        // chunk_rows=2 leads to two slices for 3 rows
2415        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2416        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
2417            .unwrap();
2418        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
2419            .unwrap();
2420        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
2421            .unwrap();
2422        let sheet = b.finish();
2423        let rv = sheet.range_view(0, 0, 2, 1);
2424        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
2425        assert_eq!(slices.len(), 2);
2426        assert_eq!(slices[0].row_start, 0);
2427        assert_eq!(slices[0].row_len, 2);
2428        assert_eq!(slices[0].cols.len(), 2);
2429        assert_eq!(slices[1].row_start, 2);
2430        assert_eq!(slices[1].row_len, 1);
2431        assert_eq!(slices[1].cols.len(), 2);
2432    }
2433
2434    #[test]
2435    fn oob_columns_are_padded() {
2436        // Build with 2 columns; request 3 columns (ec beyond last col)
2437        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2438        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
2439            .unwrap();
2440        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
2441            .unwrap();
2442        let sheet = b.finish();
2443        // Request cols [0..=2] → 3 columns with padding
2444        let rv = sheet.range_view(0, 0, 1, 2);
2445        assert_eq!(rv.dims(), (2, 3));
2446        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
2447        assert!(!slices.is_empty());
2448        for cs in &slices {
2449            assert_eq!(cs.cols.len(), 3);
2450        }
2451        // Also validate typed slices return 3 entries per segment
2452        for res in rv.numbers_slices() {
2453            let (_rs, _rl, cols) = res.unwrap();
2454            assert_eq!(cols.len(), 3);
2455        }
2456        for res in rv.booleans_slices() {
2457            let (_rs, _rl, cols) = res.unwrap();
2458            assert_eq!(cols.len(), 3);
2459        }
2460        for res in rv.text_slices() {
2461            let (_rs, _rl, cols) = res.unwrap();
2462            assert_eq!(cols.len(), 3);
2463        }
2464        for res in rv.errors_slices() {
2465            let (_rs, _rl, cols) = res.unwrap();
2466            assert_eq!(cols.len(), 3);
2467        }
2468        for res in rv.lowered_text_slices() {
2469            let (_rs, _rl, cols) = res.unwrap();
2470            assert_eq!(cols.len(), 3);
2471        }
2472    }
2473
2474    #[test]
2475    fn reversed_range_is_empty() {
2476        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2477        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
2478        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
2479        let sheet = b.finish();
2480        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
2481        assert_eq!(rv.dims(), (0, 0));
2482        assert!(rv.iter_row_chunks().next().is_none());
2483        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
2484    }
2485
2486    #[test]
2487    fn chunk_alignment_invariant() {
2488        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
2489        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
2490        for r in 0..5 {
2491            b.append_row(&[
2492                LiteralValue::Number(r as f64),
2493                LiteralValue::Text(format!("{r}")),
2494                if r % 2 == 0 {
2495                    LiteralValue::Empty
2496                } else {
2497                    LiteralValue::Boolean(true)
2498                },
2499            ])
2500            .unwrap();
2501        }
2502        let sheet = b.finish();
2503        // chunk_starts should be [0,2,4]
2504        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
2505        // All columns must share per-chunk lengths equal to [2,2,1]
2506        let lens0: Vec<usize> = sheet.columns[0]
2507            .chunks
2508            .iter()
2509            .map(|ch| ch.type_tag.len())
2510            .collect();
2511        for col in &sheet.columns[1..] {
2512            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2513            assert_eq!(lens, lens0);
2514        }
2515    }
2516
2517    #[test]
2518    fn chunking_splits_rows() {
2519        // Two columns, chunk size 2 → expect two chunks
2520        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2521        let rows = vec![
2522            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
2523            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
2524            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
2525        ];
2526        for r in rows {
2527            b.append_row(&r).unwrap();
2528        }
2529        let sheet = b.finish();
2530        assert_eq!(sheet.columns[0].chunks.len(), 2);
2531        assert_eq!(sheet.columns[1].chunks.len(), 2);
2532        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
2533        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
2534    }
2535
2536    #[test]
2537    fn pending_is_not_error() {
2538        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
2539        b.append_row(&[LiteralValue::Pending]).unwrap();
2540        let sheet = b.finish();
2541        let ch = &sheet.columns[0].chunks[0];
2542        // tag is Pending
2543        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
2544        // errors lane is effectively null
2545        let errs = ch.errors_or_null();
2546        assert_eq!(errs.null_count(), 1);
2547    }
2548
2549    #[test]
2550    fn all_null_numeric_lane_uses_null_array() {
2551        // Only text values in first column → numbers lane should be all null with correct dtype
2552        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
2553        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
2554        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
2555        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
2556        let sheet = b.finish();
2557        let ch = &sheet.columns[0].chunks[0];
2558        let nums = ch.numbers_or_null();
2559        assert_eq!(nums.len(), 3);
2560        assert_eq!(nums.null_count(), 3);
2561        assert_eq!(nums.data_type(), &DataType::Float64);
2562    }
2563
2564    #[test]
2565    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
2566        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
2567        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2568        for _ in 0..10 {
2569            b.append_row(&[LiteralValue::Empty]).unwrap();
2570        }
2571        let mut sheet = b.finish();
2572        // Add overlays at row 3 and row 4
2573        {
2574            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
2575            sheet.columns[0].chunks[c0]
2576                .overlay
2577                .set(o0, OverlayValue::Number(30.0));
2578            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
2579            sheet.columns[0].chunks[c1]
2580                .overlay
2581                .set(o1, OverlayValue::Number(40.0));
2582        }
2583        // Insert 2 rows before row 4 (at chunk boundary)
2584        sheet.insert_rows(4, 2);
2585        assert_eq!(sheet.nrows, 12);
2586        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
2587        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2588        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
2589        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
2590        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
2591
2592        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
2593        sheet.delete_rows(3, 3);
2594        assert_eq!(sheet.nrows, 9);
2595        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2596        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
2597        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
2598        let lens0: Vec<usize> = sheet.columns[0]
2599            .chunks
2600            .iter()
2601            .map(|ch| ch.type_tag.len())
2602            .collect();
2603        for col in &sheet.columns {
2604            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2605            assert_eq!(lens, lens0);
2606        }
2607        // chunk_starts should be monotonic and final chunk end == nrows
2608        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2609        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2610        let last_len = sheet.columns[0]
2611            .chunks
2612            .last()
2613            .map(|c| c.type_tag.len())
2614            .unwrap_or(0);
2615        assert_eq!(last_start + last_len, sheet.nrows as usize);
2616    }
2617
2618    #[test]
2619    fn column_insert_delete_retains_chunk_alignment() {
2620        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2621        for _ in 0..5 {
2622            b.append_row(&[
2623                LiteralValue::Empty,
2624                LiteralValue::Empty,
2625                LiteralValue::Empty,
2626            ])
2627            .unwrap();
2628        }
2629        let mut sheet = b.finish();
2630        // Record reference chunk lengths of first column
2631        let ref_lens: Vec<usize> = sheet.columns[0]
2632            .chunks
2633            .iter()
2634            .map(|ch| ch.type_tag.len())
2635            .collect();
2636        // Insert 2 columns before index 1
2637        sheet.insert_columns(1, 2);
2638        assert_eq!(sheet.columns.len(), 5);
2639        for col in &sheet.columns {
2640            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2641            assert_eq!(lens, ref_lens);
2642        }
2643        let starts_before = sheet.chunk_starts.clone();
2644        // Delete 2 columns starting at index 2 → back to 3 columns
2645        sheet.delete_columns(2, 2);
2646        assert_eq!(sheet.columns.len(), 3);
2647        for col in &sheet.columns {
2648            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2649            assert_eq!(lens, ref_lens);
2650        }
2651        // chunk_starts unchanged by column operations
2652        assert_eq!(sheet.chunk_starts, starts_before);
2653    }
2654
2655    #[test]
2656    fn multiple_adjacent_row_ops_overlay_mixed_types() {
2657        use formualizer_common::ExcelErrorKind;
2658        // Two columns to ensure alignment preserved across columns
2659        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
2660        for _ in 0..9 {
2661            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2662                .unwrap();
2663        }
2664        let mut sheet = b.finish();
2665        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
2666        // Column 0 only
2667        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
2668            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2669            let _ = sh.columns[0].chunks[ch_i].overlay.set(off, ov);
2670        };
2671        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
2672        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
2673        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
2674        set_ov(
2675            &mut sheet,
2676            6,
2677            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
2678        );
2679        set_ov(&mut sheet, 8, OverlayValue::Empty);
2680
2681        // Insert 1 row before index 3
2682        sheet.insert_rows(3, 1);
2683        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
2684        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2685        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
2686        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
2687        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
2688        match av1.get_cell(7, 0) {
2689            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2690            other => panic!("expected error at row 7, got {other:?}"),
2691        }
2692        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
2693
2694        // Insert 2 rows before index 4 (adjacent to previous region)
2695        sheet.insert_rows(4, 2);
2696        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
2697        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2698        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
2699        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
2700        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
2701        match av2.get_cell(9, 0) {
2702            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2703            other => panic!("expected error at row 9, got {other:?}"),
2704        }
2705        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
2706
2707        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
2708        sheet.delete_rows(6, 2);
2709        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2710        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
2711        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
2712        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
2713        match av3.get_cell(7, 0) {
2714            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2715            other => panic!("expected error at row 8, got {other:?}"),
2716        }
2717        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
2718
2719        // Alignment checks
2720        let lens0: Vec<usize> = sheet.columns[0]
2721            .chunks
2722            .iter()
2723            .map(|ch| ch.type_tag.len())
2724            .collect();
2725        for col in &sheet.columns {
2726            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2727            assert_eq!(lens, lens0);
2728        }
2729        // chunk_starts monotonically increasing and cover nrows
2730        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2731        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2732        let last_len = sheet.columns[0]
2733            .chunks
2734            .last()
2735            .map(|c| c.type_tag.len())
2736            .unwrap_or(0);
2737        assert_eq!(last_start + last_len, sheet.nrows as usize);
2738    }
2739
2740    #[test]
2741    fn multiple_adjacent_column_ops_alignment() {
2742        // Start with 2 columns, chunk_rows=2, rows=5
2743        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2744        for _ in 0..5 {
2745            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2746                .unwrap();
2747        }
2748        let mut sheet = b.finish();
2749        let ref_lens: Vec<usize> = sheet.columns[0]
2750            .chunks
2751            .iter()
2752            .map(|ch| ch.type_tag.len())
2753            .collect();
2754        // Insert 1 at start, then 2 at index 2 → columns = 5
2755        sheet.insert_columns(0, 1);
2756        sheet.insert_columns(2, 2);
2757        assert_eq!(sheet.columns.len(), 5);
2758        for col in &sheet.columns {
2759            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2760            assert_eq!(lens, ref_lens);
2761        }
2762        let starts_before = sheet.chunk_starts.clone();
2763        // Delete 1 at index 1, then 2 at the end if available
2764        sheet.delete_columns(1, 1);
2765        let remain = sheet.columns.len();
2766        if remain >= 3 {
2767            sheet.delete_columns(remain - 2, 2);
2768        }
2769        for col in &sheet.columns {
2770            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2771            assert_eq!(lens, ref_lens);
2772        }
2773        assert_eq!(sheet.chunk_starts, starts_before);
2774    }
2775
2776    #[test]
2777    fn overlays_on_multiple_columns_row_col_ops() {
2778        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
2779        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2780        for _ in 0..6 {
2781            b.append_row(&[
2782                LiteralValue::Empty,
2783                LiteralValue::Empty,
2784                LiteralValue::Empty,
2785            ])
2786            .unwrap();
2787        }
2788        let mut sheet = b.finish();
2789        // Overlays at row2 and row3 across columns with different types
2790        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
2791            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2792            let _ = sh.columns[col].chunks[ch_i].overlay.set(off, ov);
2793        };
2794        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
2795        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
2796        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
2797        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
2798        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
2799        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
2800
2801        // Insert a row at boundary (before row index 3)
2802        sheet.insert_rows(3, 1);
2803        // Now original row>=3 shift down by 1
2804        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
2805        // Row 2 values unchanged
2806        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
2807        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
2808        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
2809        // Row 3 became Empty (inserted)
2810        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
2811        // Row 4 holds old row 3 overlays
2812        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
2813        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
2814        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
2815
2816        // Delete column 1 (middle), values shift left
2817        sheet.delete_columns(1, 1);
2818        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
2819        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
2820        // Column 1 now was old column 2
2821        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
2822        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
2823        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
2824
2825        // Alignment preserved
2826        let lens0: Vec<usize> = sheet.columns[0]
2827            .chunks
2828            .iter()
2829            .map(|ch| ch.type_tag.len())
2830            .collect();
2831        for col in &sheet.columns {
2832            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2833            assert_eq!(lens, lens0);
2834        }
2835    }
2836
2837    #[test]
2838    fn effective_slices_overlay_precedence_numbers_text() {
2839        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
2840        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2841        for i in 0..6 {
2842            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2843                .unwrap();
2844        }
2845        let mut sheet = b.finish();
2846        // Overlays: row1 -> Text("X"), row4 -> Number(99)
2847        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2848        sheet.columns[0].chunks[c1]
2849            .overlay
2850            .set(o1, OverlayValue::Text(Arc::from("X")));
2851        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2852        sheet.columns[0].chunks[c4]
2853            .overlay
2854            .set(o4, OverlayValue::Number(99.0));
2855
2856        let av = sheet.range_view(0, 0, 5, 0);
2857        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
2858        let mut numeric: Vec<Option<f64>> = vec![None; 6];
2859        for res in av.numbers_slices() {
2860            let (row_start, row_len, cols) = res.unwrap();
2861            let a = &cols[0];
2862            for i in 0..row_len {
2863                let idx = row_start + i;
2864                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2865            }
2866        }
2867        assert_eq!(numeric[0], Some(1.0));
2868        assert_eq!(numeric[1], None); // overshadowed by text overlay
2869        assert_eq!(numeric[2], Some(3.0));
2870        assert_eq!(numeric[3], Some(4.0));
2871        assert_eq!(numeric[4], Some(99.0));
2872        assert_eq!(numeric[5], Some(6.0));
2873
2874        // Validate text_slices: row1 has "X", others null
2875        let mut texts: Vec<Option<String>> = vec![None; 6];
2876        for res in av.text_slices() {
2877            let (row_start, row_len, cols) = res.unwrap();
2878            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
2879            for i in 0..row_len {
2880                let idx = row_start + i;
2881                texts[idx] = if a.is_null(i) {
2882                    None
2883                } else {
2884                    Some(a.value(i).to_string())
2885                };
2886            }
2887        }
2888        assert_eq!(texts[1].as_deref(), Some("X"));
2889        assert!(texts[0].is_none());
2890        assert!(texts[2].is_none());
2891        assert!(texts[3].is_none());
2892        assert!(texts[4].is_none());
2893        assert!(texts[5].is_none());
2894    }
2895
2896    #[test]
2897    fn effective_slices_overlay_precedence_booleans() {
2898        // Base booleans over 1 column; overlays include boolean and non-boolean types.
2899        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2900        for i in 0..6 {
2901            let v = if i % 2 == 0 {
2902                LiteralValue::Boolean(true)
2903            } else {
2904                LiteralValue::Boolean(false)
2905            };
2906            b.append_row(&[v]).unwrap();
2907        }
2908        let mut sheet = b.finish();
2909        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
2910        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2911        sheet.columns[0].chunks[c1]
2912            .overlay
2913            .set(o1, OverlayValue::Boolean(true));
2914        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
2915        sheet.columns[0].chunks[c2]
2916            .overlay
2917            .set(o2, OverlayValue::Text(Arc::from("T")));
2918
2919        let av = sheet.range_view(0, 0, 5, 0);
2920        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
2921        let mut bools: Vec<Option<bool>> = vec![None; 6];
2922        for res in av.booleans_slices() {
2923            let (row_start, row_len, cols) = res.unwrap();
2924            let a = &cols[0];
2925            for i in 0..row_len {
2926                let idx = row_start + i;
2927                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2928            }
2929        }
2930        assert_eq!(bools[0], Some(true));
2931        assert_eq!(bools[1], Some(true)); // overlay to true
2932        assert_eq!(bools[2], None); // overshadowed by text overlay
2933        // spot-check others remain base
2934        assert_eq!(bools[3], Some(false));
2935    }
2936
2937    #[test]
2938    fn effective_slices_overlay_precedence_errors() {
2939        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
2940        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2941        for i in 0..6 {
2942            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2943                .unwrap();
2944        }
2945        let mut sheet = b.finish();
2946        // Overlay error at row 4
2947        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2948        sheet.columns[0].chunks[c4]
2949            .overlay
2950            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
2951
2952        let av = sheet.range_view(0, 0, 5, 0);
2953        let mut errs: Vec<Option<u8>> = vec![None; 6];
2954        for res in av.errors_slices() {
2955            let (row_start, row_len, cols) = res.unwrap();
2956            let a = &cols[0];
2957            for i in 0..row_len {
2958                let idx = row_start + i;
2959                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2960            }
2961        }
2962        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
2963        assert!(errs[3].is_none());
2964    }
2965}