Skip to main content

formualizer_eval/arrow_store/
mod.rs

1use arrow_array::Array;
2use arrow_array::new_null_array;
3use arrow_schema::DataType;
4use chrono::Timelike;
5use std::sync::Arc;
6
7use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
8use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
9use once_cell::sync::OnceCell;
10
11use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
12use rustc_hash::FxHashMap;
13use std::collections::HashMap;
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane (ASCII lower), nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91    // Phase 0/1: separate computed overlay (formula/spill outputs)
92    pub computed_overlay: Overlay,
93}
94
95impl ColumnChunk {
96    #[inline]
97    pub fn len(&self) -> usize {
98        self.type_tag.len()
99    }
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104    #[inline]
105    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
106        if let Some(a) = &self.numbers {
107            return a.clone();
108        }
109        self.lazy_null_numbers
110            .get_or_init(|| {
111                let arr = new_null_array(&DataType::Float64, self.len());
112                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
113            })
114            .clone()
115    }
116    #[inline]
117    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
118        if let Some(a) = &self.booleans {
119            return a.clone();
120        }
121        self.lazy_null_booleans
122            .get_or_init(|| {
123                let arr = new_null_array(&DataType::Boolean, self.len());
124                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
125            })
126            .clone()
127    }
128    #[inline]
129    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
130        if let Some(a) = &self.errors {
131            return a.clone();
132        }
133        self.lazy_null_errors
134            .get_or_init(|| {
135                let arr = new_null_array(&DataType::UInt8, self.len());
136                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
137            })
138            .clone()
139    }
140    #[inline]
141    pub fn text_or_null(&self) -> ArrayRef {
142        if let Some(a) = &self.text {
143            return a.clone();
144        }
145        self.lazy_null_text
146            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
147            .clone()
148    }
149
150    /// Lowercased text lane (ASCII lower), with nulls preserved. Cached per chunk.
151    pub fn text_lower_or_null(&self) -> ArrayRef {
152        if let Some(a) = self.lowered_text.get() {
153            return a.clone();
154        }
155        // Lowercase when text present; else return null Utf8
156        let out: ArrayRef = if let Some(txt) = &self.text {
157            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
158            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
159            for i in 0..sa.len() {
160                if sa.is_null(i) {
161                    b.append_null();
162                } else {
163                    b.append_value(sa.value(i).to_ascii_lowercase());
164                }
165            }
166            let lowered = b.finish();
167            Arc::new(lowered)
168        } else {
169            new_null_array(&DataType::Utf8, self.len())
170        };
171        self.lowered_text.get_or_init(|| out.clone());
172        out
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct ArrowColumn {
178    pub chunks: Vec<ColumnChunk>,
179    pub sparse_chunks: FxHashMap<usize, ColumnChunk>,
180    pub index: u32,
181}
182
183impl ArrowColumn {
184    #[inline]
185    pub fn chunk(&self, idx: usize) -> Option<&ColumnChunk> {
186        if idx < self.chunks.len() {
187            Some(&self.chunks[idx])
188        } else {
189            self.sparse_chunks.get(&idx)
190        }
191    }
192
193    #[inline]
194    pub fn chunk_mut(&mut self, idx: usize) -> Option<&mut ColumnChunk> {
195        if idx < self.chunks.len() {
196            Some(&mut self.chunks[idx])
197        } else {
198            self.sparse_chunks.get_mut(&idx)
199        }
200    }
201
202    #[inline]
203    pub fn has_sparse_chunks(&self) -> bool {
204        !self.sparse_chunks.is_empty()
205    }
206
207    #[inline]
208    pub fn total_chunk_count(&self) -> usize {
209        self.chunks.len() + self.sparse_chunks.len()
210    }
211}
212
213#[derive(Debug, Clone)]
214pub struct ArrowSheet {
215    pub name: Arc<str>,
216    pub columns: Vec<ArrowColumn>,
217    pub nrows: u32,
218    pub chunk_starts: Vec<usize>,
219    /// Preferred chunk size (rows) for capacity growth operations.
220    ///
221    /// For Arrow-ingested sheets this matches the ingest `chunk_rows`. For sparse/overlay-created
222    /// sheets this defaults to 32k to avoid creating thousands of tiny chunks during growth.
223    pub chunk_rows: usize,
224}
225
226#[derive(Debug, Default, Clone)]
227pub struct SheetStore {
228    pub sheets: Vec<ArrowSheet>,
229}
230
231impl SheetStore {
232    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
233        self.sheets.iter().find(|s| s.name.as_ref() == name)
234    }
235    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
236        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
237    }
238}
239
240/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
241pub struct IngestBuilder {
242    name: Arc<str>,
243    ncols: usize,
244    chunk_rows: usize,
245    date_system: crate::engine::DateSystem,
246
247    // Per-column active builders for current chunk
248    num_builders: Vec<Float64Builder>,
249    bool_builders: Vec<BooleanBuilder>,
250    text_builders: Vec<StringBuilder>,
251    err_builders: Vec<UInt8Builder>,
252    tag_builders: Vec<UInt8Builder>,
253
254    // Per-column per-lane non-null counters for current chunk
255    lane_counts: Vec<LaneCounts>,
256
257    // Accumulated chunks
258    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
259    row_in_chunk: usize,
260    total_rows: u32,
261}
262
263#[derive(Debug, Clone, Copy, Default)]
264struct LaneCounts {
265    n_num: usize,
266    n_bool: usize,
267    n_text: usize,
268    n_err: usize,
269}
270
271impl IngestBuilder {
272    pub fn new(
273        sheet_name: &str,
274        ncols: usize,
275        chunk_rows: usize,
276        date_system: crate::engine::DateSystem,
277    ) -> Self {
278        let mut chunks = Vec::with_capacity(ncols);
279        chunks.resize_with(ncols, Vec::new);
280        Self {
281            name: Arc::from(sheet_name.to_string()),
282            ncols,
283            chunk_rows: chunk_rows.max(1),
284            date_system,
285            num_builders: (0..ncols)
286                .map(|_| Float64Builder::with_capacity(chunk_rows))
287                .collect(),
288            bool_builders: (0..ncols)
289                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
290                .collect(),
291            text_builders: (0..ncols)
292                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
293                .collect(),
294            err_builders: (0..ncols)
295                .map(|_| UInt8Builder::with_capacity(chunk_rows))
296                .collect(),
297            tag_builders: (0..ncols)
298                .map(|_| UInt8Builder::with_capacity(chunk_rows))
299                .collect(),
300            lane_counts: vec![LaneCounts::default(); ncols],
301            chunks,
302            row_in_chunk: 0,
303            total_rows: 0,
304        }
305    }
306
307    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
308    /// Text borrows are copied into the internal StringBuilder.
309    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
310        assert_eq!(row.len(), self.ncols, "row width mismatch");
311        for (c, cell) in row.iter().enumerate() {
312            match cell {
313                CellIngest::Empty => {
314                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
315                    self.num_builders[c].append_null();
316                    self.bool_builders[c].append_null();
317                    self.text_builders[c].append_null();
318                    self.err_builders[c].append_null();
319                }
320                CellIngest::Number(n) => {
321                    self.tag_builders[c].append_value(TypeTag::Number as u8);
322                    self.num_builders[c].append_value(*n);
323                    self.lane_counts[c].n_num += 1;
324                    self.bool_builders[c].append_null();
325                    self.text_builders[c].append_null();
326                    self.err_builders[c].append_null();
327                }
328                CellIngest::Boolean(b) => {
329                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
330                    self.num_builders[c].append_null();
331                    self.bool_builders[c].append_value(*b);
332                    self.lane_counts[c].n_bool += 1;
333                    self.text_builders[c].append_null();
334                    self.err_builders[c].append_null();
335                }
336                CellIngest::Text(s) => {
337                    self.tag_builders[c].append_value(TypeTag::Text as u8);
338                    self.num_builders[c].append_null();
339                    self.bool_builders[c].append_null();
340                    self.text_builders[c].append_value(s);
341                    self.lane_counts[c].n_text += 1;
342                    self.err_builders[c].append_null();
343                }
344                CellIngest::ErrorCode(code) => {
345                    self.tag_builders[c].append_value(TypeTag::Error as u8);
346                    self.num_builders[c].append_null();
347                    self.bool_builders[c].append_null();
348                    self.text_builders[c].append_null();
349                    self.err_builders[c].append_value(*code);
350                    self.lane_counts[c].n_err += 1;
351                }
352                CellIngest::DateSerial(serial) => {
353                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
354                    self.num_builders[c].append_value(*serial);
355                    self.lane_counts[c].n_num += 1;
356                    self.bool_builders[c].append_null();
357                    self.text_builders[c].append_null();
358                    self.err_builders[c].append_null();
359                }
360                CellIngest::Pending => {
361                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
362                    self.num_builders[c].append_null();
363                    self.bool_builders[c].append_null();
364                    self.text_builders[c].append_null();
365                    self.err_builders[c].append_null();
366                }
367            }
368        }
369        self.row_in_chunk += 1;
370        self.total_rows += 1;
371        if self.row_in_chunk >= self.chunk_rows {
372            self.finish_chunk();
373        }
374        Ok(())
375    }
376
377    /// Streaming row append from an iterator of typed cell tokens.
378    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
379    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
380    where
381        I: ExactSizeIterator<Item = CellIngest<'a>>,
382    {
383        assert_eq!(iter.len(), self.ncols, "row width mismatch");
384        for (c, cell) in iter.enumerate() {
385            match cell {
386                CellIngest::Empty => {
387                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
388                    self.num_builders[c].append_null();
389                    self.bool_builders[c].append_null();
390                    self.text_builders[c].append_null();
391                    self.err_builders[c].append_null();
392                }
393                CellIngest::Number(n) => {
394                    self.tag_builders[c].append_value(TypeTag::Number as u8);
395                    self.num_builders[c].append_value(n);
396                    self.lane_counts[c].n_num += 1;
397                    self.bool_builders[c].append_null();
398                    self.text_builders[c].append_null();
399                    self.err_builders[c].append_null();
400                }
401                CellIngest::Boolean(b) => {
402                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
403                    self.num_builders[c].append_null();
404                    self.bool_builders[c].append_value(b);
405                    self.lane_counts[c].n_bool += 1;
406                    self.text_builders[c].append_null();
407                    self.err_builders[c].append_null();
408                }
409                CellIngest::Text(s) => {
410                    self.tag_builders[c].append_value(TypeTag::Text as u8);
411                    self.num_builders[c].append_null();
412                    self.bool_builders[c].append_null();
413                    self.text_builders[c].append_value(s);
414                    self.lane_counts[c].n_text += 1;
415                    self.err_builders[c].append_null();
416                }
417                CellIngest::ErrorCode(code) => {
418                    self.tag_builders[c].append_value(TypeTag::Error as u8);
419                    self.num_builders[c].append_null();
420                    self.bool_builders[c].append_null();
421                    self.text_builders[c].append_null();
422                    self.err_builders[c].append_value(code);
423                    self.lane_counts[c].n_err += 1;
424                }
425                CellIngest::DateSerial(serial) => {
426                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
427                    self.num_builders[c].append_value(serial);
428                    self.lane_counts[c].n_num += 1;
429                    self.bool_builders[c].append_null();
430                    self.text_builders[c].append_null();
431                    self.err_builders[c].append_null();
432                }
433                CellIngest::Pending => {
434                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
435                    self.num_builders[c].append_null();
436                    self.bool_builders[c].append_null();
437                    self.text_builders[c].append_null();
438                    self.err_builders[c].append_null();
439                }
440            }
441        }
442        self.row_in_chunk += 1;
443        self.total_rows += 1;
444        if self.row_in_chunk >= self.chunk_rows {
445            self.finish_chunk();
446        }
447        Ok(())
448    }
449
450    /// Append a single row of values. Length must match `ncols`.
451    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
452        assert_eq!(row.len(), self.ncols, "row width mismatch");
453
454        for (c, v) in row.iter().enumerate() {
455            let tag = TypeTag::from_value(v) as u8;
456            self.tag_builders[c].append_value(tag);
457
458            match v {
459                LiteralValue::Empty => {
460                    self.num_builders[c].append_null();
461                    self.bool_builders[c].append_null();
462                    self.text_builders[c].append_null();
463                    self.err_builders[c].append_null();
464                }
465                LiteralValue::Int(i) => {
466                    self.num_builders[c].append_value(*i as f64);
467                    self.lane_counts[c].n_num += 1;
468                    self.bool_builders[c].append_null();
469                    self.text_builders[c].append_null();
470                    self.err_builders[c].append_null();
471                }
472                LiteralValue::Number(n) => {
473                    self.num_builders[c].append_value(*n);
474                    self.lane_counts[c].n_num += 1;
475                    self.bool_builders[c].append_null();
476                    self.text_builders[c].append_null();
477                    self.err_builders[c].append_null();
478                }
479                LiteralValue::Boolean(b) => {
480                    self.num_builders[c].append_null();
481                    self.bool_builders[c].append_value(*b);
482                    self.lane_counts[c].n_bool += 1;
483                    self.text_builders[c].append_null();
484                    self.err_builders[c].append_null();
485                }
486                LiteralValue::Text(s) => {
487                    self.num_builders[c].append_null();
488                    self.bool_builders[c].append_null();
489                    self.text_builders[c].append_value(s);
490                    self.lane_counts[c].n_text += 1;
491                    self.err_builders[c].append_null();
492                }
493                LiteralValue::Error(e) => {
494                    self.num_builders[c].append_null();
495                    self.bool_builders[c].append_null();
496                    self.text_builders[c].append_null();
497                    self.err_builders[c].append_value(map_error_code(e.kind));
498                    self.lane_counts[c].n_err += 1;
499                }
500                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
501                LiteralValue::Date(d) => {
502                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
503                    let serial =
504                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
505                    self.num_builders[c].append_value(serial);
506                    self.lane_counts[c].n_num += 1;
507                    self.bool_builders[c].append_null();
508                    self.text_builders[c].append_null();
509                    self.err_builders[c].append_null();
510                }
511                LiteralValue::DateTime(dt) => {
512                    let serial =
513                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
514                    self.num_builders[c].append_value(serial);
515                    self.lane_counts[c].n_num += 1;
516                    self.bool_builders[c].append_null();
517                    self.text_builders[c].append_null();
518                    self.err_builders[c].append_null();
519                }
520                LiteralValue::Time(t) => {
521                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
522                    self.num_builders[c].append_value(serial);
523                    self.lane_counts[c].n_num += 1;
524                    self.bool_builders[c].append_null();
525                    self.text_builders[c].append_null();
526                    self.err_builders[c].append_null();
527                }
528                LiteralValue::Duration(dur) => {
529                    let serial = dur.num_seconds() as f64 / 86_400.0;
530                    self.num_builders[c].append_value(serial);
531                    self.lane_counts[c].n_num += 1;
532                    self.bool_builders[c].append_null();
533                    self.text_builders[c].append_null();
534                    self.err_builders[c].append_null();
535                }
536                LiteralValue::Array(_) => {
537                    // Not allowed as a stored scalar; mark as error kind VALUE
538                    self.num_builders[c].append_null();
539                    self.bool_builders[c].append_null();
540                    self.text_builders[c].append_null();
541                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
542                    self.lane_counts[c].n_err += 1;
543                }
544                LiteralValue::Pending => {
545                    // Pending: tag only; all lanes remain null (no error)
546                    self.num_builders[c].append_null();
547                    self.bool_builders[c].append_null();
548                    self.text_builders[c].append_null();
549                    self.err_builders[c].append_null();
550                }
551            }
552        }
553
554        self.row_in_chunk += 1;
555        self.total_rows += 1;
556
557        if self.row_in_chunk >= self.chunk_rows {
558            self.finish_chunk();
559        }
560
561        Ok(())
562    }
563
564    fn finish_chunk(&mut self) {
565        if self.row_in_chunk == 0 {
566            return;
567        }
568        for c in 0..self.ncols {
569            let len = self.row_in_chunk;
570            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
571                None
572            } else {
573                Some(Arc::new(self.num_builders[c].finish()))
574            };
575            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
576                None
577            } else {
578                Some(Arc::new(self.bool_builders[c].finish()))
579            };
580            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
581                None
582            } else {
583                Some(Arc::new(self.text_builders[c].finish()))
584            };
585            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
586                None
587            } else {
588                Some(Arc::new(self.err_builders[c].finish()))
589            };
590            let tags: UInt8Array = self.tag_builders[c].finish();
591
592            let chunk = ColumnChunk {
593                numbers: numbers_arc,
594                booleans: booleans_arc,
595                text: text_ref,
596                errors: errors_arc,
597                type_tag: Arc::new(tags),
598                formula_id: None,
599                meta: ColumnChunkMeta {
600                    len,
601                    non_null_num: self.lane_counts[c].n_num,
602                    non_null_bool: self.lane_counts[c].n_bool,
603                    non_null_text: self.lane_counts[c].n_text,
604                    non_null_err: self.lane_counts[c].n_err,
605                },
606                lazy_null_numbers: OnceCell::new(),
607                lazy_null_booleans: OnceCell::new(),
608                lazy_null_text: OnceCell::new(),
609                lazy_null_errors: OnceCell::new(),
610                lowered_text: OnceCell::new(),
611                overlay: Overlay::new(),
612                computed_overlay: Overlay::new(),
613            };
614            self.chunks[c].push(chunk);
615
616            // re-init builders for next chunk
617            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
618            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
619            self.text_builders[c] =
620                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
621            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
622            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
623            self.lane_counts[c] = LaneCounts::default();
624        }
625        self.row_in_chunk = 0;
626    }
627
628    pub fn finish(mut self) -> ArrowSheet {
629        // flush partial chunk
630        if self.row_in_chunk > 0 {
631            self.finish_chunk();
632        }
633
634        let mut columns = Vec::with_capacity(self.ncols);
635        for (idx, chunks) in self.chunks.into_iter().enumerate() {
636            columns.push(ArrowColumn {
637                chunks,
638                sparse_chunks: FxHashMap::default(),
639                index: idx as u32,
640            });
641        }
642        // Precompute chunk starts from first column and enforce alignment across columns
643        let mut chunk_starts: Vec<usize> = Vec::new();
644        if let Some(col0) = columns.first() {
645            let chunks_len0 = col0.chunks.len();
646            for (ci, col) in columns.iter().enumerate() {
647                if col.chunks.len() != chunks_len0 {
648                    panic!(
649                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
650                        ci,
651                        col.chunks.len(),
652                        chunks_len0
653                    );
654                }
655            }
656            let mut cur = 0usize;
657            for i in 0..chunks_len0 {
658                let len_i = col0.chunks[i].type_tag.len();
659                for (ci, col) in columns.iter().enumerate() {
660                    let got = col.chunks[i].type_tag.len();
661                    if got != len_i {
662                        panic!(
663                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
664                        );
665                    }
666                }
667                chunk_starts.push(cur);
668                cur += len_i;
669            }
670        }
671        ArrowSheet {
672            name: self.name,
673            columns,
674            nrows: self.total_rows,
675            chunk_starts,
676            chunk_rows: self.chunk_rows,
677        }
678    }
679}
680
681pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
682    match kind {
683        ExcelErrorKind::Null => 1,
684        ExcelErrorKind::Ref => 2,
685        ExcelErrorKind::Name => 3,
686        ExcelErrorKind::Value => 4,
687        ExcelErrorKind::Div => 5,
688        ExcelErrorKind::Na => 6,
689        ExcelErrorKind::Num => 7,
690        ExcelErrorKind::Error => 8,
691        ExcelErrorKind::NImpl => 9,
692        ExcelErrorKind::Spill => 10,
693        ExcelErrorKind::Calc => 11,
694        ExcelErrorKind::Circ => 12,
695        ExcelErrorKind::Cancelled => 13,
696    }
697}
698
699pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
700    match code {
701        1 => ExcelErrorKind::Null,
702        2 => ExcelErrorKind::Ref,
703        3 => ExcelErrorKind::Name,
704        4 => ExcelErrorKind::Value,
705        5 => ExcelErrorKind::Div,
706        6 => ExcelErrorKind::Na,
707        7 => ExcelErrorKind::Num,
708        8 => ExcelErrorKind::Error,
709        9 => ExcelErrorKind::NImpl,
710        10 => ExcelErrorKind::Spill,
711        11 => ExcelErrorKind::Calc,
712        12 => ExcelErrorKind::Circ,
713        13 => ExcelErrorKind::Cancelled,
714        _ => ExcelErrorKind::Error,
715    }
716}
717
718// ─────────────────────────── Overlay (Phase C) ────────────────────────────
719
720/// Zero-allocation cell token for ingestion.
721pub enum CellIngest<'a> {
722    Empty,
723    Number(f64),
724    Boolean(bool),
725    Text(&'a str),
726    ErrorCode(u8),
727    DateSerial(f64),
728    Pending,
729}
730
731#[derive(Debug, Clone)]
732pub enum OverlayValue {
733    Empty,
734    Number(f64),
735    Boolean(bool),
736    Text(Arc<str>),
737    Error(u8),
738    Pending,
739}
740
741#[derive(Debug, Default, Clone)]
742pub struct Overlay {
743    map: HashMap<usize, OverlayValue>,
744}
745
746impl Overlay {
747    pub fn new() -> Self {
748        Self {
749            map: HashMap::new(),
750        }
751    }
752    #[inline]
753    pub fn get(&self, off: usize) -> Option<&OverlayValue> {
754        self.map.get(&off)
755    }
756    #[inline]
757    pub fn set(&mut self, off: usize, v: OverlayValue) {
758        self.map.insert(off, v);
759    }
760    #[inline]
761    pub fn clear(&mut self) {
762        self.map.clear();
763    }
764    #[inline]
765    pub fn len(&self) -> usize {
766        self.map.len()
767    }
768    #[inline]
769    pub fn is_empty(&self) -> bool {
770        self.map.is_empty()
771    }
772    #[inline]
773    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
774        self.map.keys().any(|k| range.contains(k))
775    }
776}
777
778impl ArrowSheet {
779    /// Return a summary of each column's chunk counts, total rows, and lane presence.
780    pub fn shape(&self) -> Vec<ColumnShape> {
781        self.columns
782            .iter()
783            .map(|c| {
784                let chunks = c.chunks.len();
785                let rows = self.nrows as usize;
786                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
787                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
788                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
789                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
790                ColumnShape {
791                    index: c.index,
792                    chunks,
793                    rows,
794                    has_num,
795                    has_bool,
796                    has_text,
797                    has_err,
798                }
799            })
800            .collect()
801    }
802
803    pub fn range_view(
804        &self,
805        sr: usize,
806        sc: usize,
807        er: usize,
808        ec: usize,
809    ) -> crate::engine::range_view::RangeView<'_> {
810        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
811        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
812        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
813        crate::engine::range_view::RangeView::new(
814            crate::engine::range_view::RangeBacking::Borrowed(self),
815            sr,
816            sc,
817            er,
818            ec,
819            rows,
820            cols,
821        )
822    }
823
824    /// Ensure capacity to address at least `target_rows` rows by extending the row chunk map.
825    ///
826    /// This updates `chunk_starts`/`nrows` but does **not** eagerly densify all columns with
827    /// new empty chunks. Missing chunks are treated as all-empty and can be materialized lazily.
828    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
829        if target_rows as u32 <= self.nrows {
830            return;
831        }
832
833        if self.chunk_starts.is_empty() {
834            self.chunk_starts.push(0);
835        }
836
837        let chunk_size = self.chunk_rows.max(1);
838
839        let mut cur_rows = self.nrows as usize;
840        while cur_rows < target_rows {
841            let len = (target_rows - cur_rows).min(chunk_size.max(1));
842            // Start of the next chunk is the current row count.
843            if self.chunk_starts.last().copied() != Some(cur_rows) {
844                self.chunk_starts.push(cur_rows);
845            }
846            cur_rows += len;
847            self.nrows = cur_rows as u32;
848        }
849    }
850
851    /// Ensure a mutable chunk for a given column/chunk index.
852    ///
853    /// If the chunk is beyond the column's dense chunk vector, it is stored in `sparse_chunks`.
854    pub fn ensure_column_chunk_mut(
855        &mut self,
856        col_idx: usize,
857        ch_idx: usize,
858    ) -> Option<&mut ColumnChunk> {
859        let start = *self.chunk_starts.get(ch_idx)?;
860        let end = self
861            .chunk_starts
862            .get(ch_idx + 1)
863            .copied()
864            .unwrap_or(self.nrows as usize);
865        let len = end.saturating_sub(start);
866
867        let col = self.columns.get_mut(col_idx)?;
868        if ch_idx < col.chunks.len() {
869            return Some(&mut col.chunks[ch_idx]);
870        }
871        Some(
872            col.sparse_chunks
873                .entry(ch_idx)
874                .or_insert_with(|| Self::make_empty_chunk(len)),
875        )
876    }
877
878    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
879    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
880        if abs_row >= self.nrows as usize {
881            return None;
882        }
883        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
884            Ok(i) => i,
885            Err(0) => 0,
886            Err(i) => i - 1,
887        };
888        let start = self.chunk_starts[ch_idx];
889        Some((ch_idx, abs_row - start))
890    }
891
892    fn recompute_chunk_starts(&mut self) {
893        self.chunk_starts.clear();
894        if let Some(col0) = self.columns.first() {
895            let mut cur = 0usize;
896            for ch in &col0.chunks {
897                self.chunk_starts.push(cur);
898                cur += ch.type_tag.len();
899            }
900        }
901    }
902
903    fn make_empty_chunk(len: usize) -> ColumnChunk {
904        ColumnChunk {
905            numbers: None,
906            booleans: None,
907            text: None,
908            errors: None,
909            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
910            formula_id: None,
911            meta: ColumnChunkMeta {
912                len,
913                non_null_num: 0,
914                non_null_bool: 0,
915                non_null_text: 0,
916                non_null_err: 0,
917            },
918            lazy_null_numbers: OnceCell::new(),
919            lazy_null_booleans: OnceCell::new(),
920            lazy_null_text: OnceCell::new(),
921            lazy_null_errors: OnceCell::new(),
922            lowered_text: OnceCell::new(),
923            overlay: Overlay::new(),
924            computed_overlay: Overlay::new(),
925        }
926    }
927
928    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
929        // Slice type tags
930        use arrow_array::Array;
931        let type_tag: Arc<UInt8Array> = Arc::new(
932            Array::slice(ch.type_tag.as_ref(), off, len)
933                .as_any()
934                .downcast_ref::<UInt8Array>()
935                .unwrap()
936                .clone(),
937        );
938        // Slice numbers if present and keep only if any non-null
939        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
940            let sl = Array::slice(a.as_ref(), off, len);
941            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
942            let nn = len.saturating_sub(fa.null_count());
943            if nn == 0 { None } else { Some(Arc::new(fa)) }
944        });
945        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
946            let sl = Array::slice(a.as_ref(), off, len);
947            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
948            let nn = len.saturating_sub(ba.null_count());
949            if nn == 0 { None } else { Some(Arc::new(ba)) }
950        });
951        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
952            let sl = Array::slice(a.as_ref(), off, len);
953            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
954            let nn = len.saturating_sub(sa.null_count());
955            if nn == 0 {
956                None
957            } else {
958                Some(Arc::new(sa) as ArrayRef)
959            }
960        });
961        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
962            let sl = Array::slice(a.as_ref(), off, len);
963            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
964            let nn = len.saturating_sub(ea.null_count());
965            if nn == 0 { None } else { Some(Arc::new(ea)) }
966        });
967        // Split overlays for this slice
968        let mut overlay = Overlay::new();
969        for (k, v) in ch.overlay.map.iter() {
970            if *k >= off && *k < off + len {
971                overlay.set(*k - off, v.clone());
972            }
973        }
974        let mut computed_overlay = Overlay::new();
975        for (k, v) in ch.computed_overlay.map.iter() {
976            if *k >= off && *k < off + len {
977                computed_overlay.set(*k - off, v.clone());
978            }
979        }
980        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
981        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
982        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
983        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
984        ColumnChunk {
985            numbers: numbers.clone(),
986            booleans: booleans.clone(),
987            text: text.clone(),
988            errors: errors.clone(),
989            type_tag,
990            formula_id: None,
991            meta: ColumnChunkMeta {
992                len,
993                non_null_num,
994                non_null_bool,
995                non_null_text,
996                non_null_err,
997            },
998            lazy_null_numbers: OnceCell::new(),
999            lazy_null_booleans: OnceCell::new(),
1000            lazy_null_text: OnceCell::new(),
1001            lazy_null_errors: OnceCell::new(),
1002            lowered_text: OnceCell::new(),
1003            overlay,
1004            computed_overlay,
1005        }
1006    }
1007
1008    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
1009    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
1010    pub fn maybe_compact_chunk(
1011        &mut self,
1012        col_idx: usize,
1013        ch_idx: usize,
1014        abs_threshold: usize,
1015        frac_den: usize,
1016    ) -> bool {
1017        if col_idx >= self.columns.len() {
1018            return false;
1019        }
1020
1021        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
1022            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
1023                return false;
1024            };
1025            let len = ch_ref.type_tag.len();
1026            if len == 0 {
1027                return false;
1028            }
1029
1030            let ov_len = ch_ref.overlay.len();
1031            let den = frac_den.max(1);
1032            let trig = ov_len > (len / den) || ov_len > abs_threshold;
1033            if !trig {
1034                return false;
1035            }
1036
1037            // Rebuild: merge base lanes with overlays row-by-row.
1038            let mut tag_b = UInt8Builder::with_capacity(len);
1039            let mut nb = Float64Builder::with_capacity(len);
1040            let mut bb = BooleanBuilder::with_capacity(len);
1041            let mut sb = StringBuilder::with_capacity(len, len * 8);
1042            let mut eb = UInt8Builder::with_capacity(len);
1043            let mut non_num = 0usize;
1044            let mut non_bool = 0usize;
1045            let mut non_text = 0usize;
1046            let mut non_err = 0usize;
1047
1048            for i in 0..len {
1049                // If overlay present, use it. Otherwise, use base tag+lane.
1050                if let Some(ov) = ch_ref.overlay.get(i) {
1051                    match ov {
1052                        OverlayValue::Empty => {
1053                            tag_b.append_value(TypeTag::Empty as u8);
1054                            nb.append_null();
1055                            bb.append_null();
1056                            sb.append_null();
1057                            eb.append_null();
1058                        }
1059                        OverlayValue::Number(n) => {
1060                            tag_b.append_value(TypeTag::Number as u8);
1061                            nb.append_value(*n);
1062                            non_num += 1;
1063                            bb.append_null();
1064                            sb.append_null();
1065                            eb.append_null();
1066                        }
1067                        OverlayValue::Boolean(b) => {
1068                            tag_b.append_value(TypeTag::Boolean as u8);
1069                            nb.append_null();
1070                            bb.append_value(*b);
1071                            non_bool += 1;
1072                            sb.append_null();
1073                            eb.append_null();
1074                        }
1075                        OverlayValue::Text(s) => {
1076                            tag_b.append_value(TypeTag::Text as u8);
1077                            nb.append_null();
1078                            bb.append_null();
1079                            sb.append_value(s);
1080                            non_text += 1;
1081                            eb.append_null();
1082                        }
1083                        OverlayValue::Error(code) => {
1084                            tag_b.append_value(TypeTag::Error as u8);
1085                            nb.append_null();
1086                            bb.append_null();
1087                            sb.append_null();
1088                            eb.append_value(*code);
1089                            non_err += 1;
1090                        }
1091                        OverlayValue::Pending => {
1092                            tag_b.append_value(TypeTag::Pending as u8);
1093                            nb.append_null();
1094                            bb.append_null();
1095                            sb.append_null();
1096                            eb.append_null();
1097                        }
1098                    }
1099                } else {
1100                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
1101                    match tag {
1102                        TypeTag::Empty => {
1103                            tag_b.append_value(TypeTag::Empty as u8);
1104                            nb.append_null();
1105                            bb.append_null();
1106                            sb.append_null();
1107                            eb.append_null();
1108                        }
1109                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
1110                            tag_b.append_value(TypeTag::Number as u8);
1111                            if let Some(a) = &ch_ref.numbers {
1112                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
1113                                if fa.is_null(i) {
1114                                    nb.append_null();
1115                                } else {
1116                                    nb.append_value(fa.value(i));
1117                                    non_num += 1;
1118                                }
1119                            } else {
1120                                nb.append_null();
1121                            }
1122                            bb.append_null();
1123                            sb.append_null();
1124                            eb.append_null();
1125                        }
1126                        TypeTag::Boolean => {
1127                            tag_b.append_value(TypeTag::Boolean as u8);
1128                            nb.append_null();
1129                            if let Some(a) = &ch_ref.booleans {
1130                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
1131                                if ba.is_null(i) {
1132                                    bb.append_null();
1133                                } else {
1134                                    bb.append_value(ba.value(i));
1135                                    non_bool += 1;
1136                                }
1137                            } else {
1138                                bb.append_null();
1139                            }
1140                            sb.append_null();
1141                            eb.append_null();
1142                        }
1143                        TypeTag::Text => {
1144                            tag_b.append_value(TypeTag::Text as u8);
1145                            nb.append_null();
1146                            bb.append_null();
1147                            if let Some(a) = &ch_ref.text {
1148                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
1149                                if sa.is_null(i) {
1150                                    sb.append_null();
1151                                } else {
1152                                    sb.append_value(sa.value(i));
1153                                    non_text += 1;
1154                                }
1155                            } else {
1156                                sb.append_null();
1157                            }
1158                            eb.append_null();
1159                        }
1160                        TypeTag::Error => {
1161                            tag_b.append_value(TypeTag::Error as u8);
1162                            nb.append_null();
1163                            bb.append_null();
1164                            sb.append_null();
1165                            if let Some(a) = &ch_ref.errors {
1166                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
1167                                if ea.is_null(i) {
1168                                    eb.append_null();
1169                                } else {
1170                                    eb.append_value(ea.value(i));
1171                                    non_err += 1;
1172                                }
1173                            } else {
1174                                eb.append_null();
1175                            }
1176                        }
1177                        TypeTag::Pending => {
1178                            tag_b.append_value(TypeTag::Pending as u8);
1179                            nb.append_null();
1180                            bb.append_null();
1181                            sb.append_null();
1182                            eb.append_null();
1183                        }
1184                    }
1185                }
1186            }
1187
1188            let tags = Arc::new(tag_b.finish());
1189            let numbers = {
1190                let a = nb.finish();
1191                if non_num == 0 {
1192                    None
1193                } else {
1194                    Some(Arc::new(a))
1195                }
1196            };
1197            let booleans = {
1198                let a = bb.finish();
1199                if non_bool == 0 {
1200                    None
1201                } else {
1202                    Some(Arc::new(a))
1203                }
1204            };
1205            let text = {
1206                let a = sb.finish();
1207                if non_text == 0 {
1208                    None
1209                } else {
1210                    Some(Arc::new(a) as ArrayRef)
1211                }
1212            };
1213            let errors = {
1214                let a = eb.finish();
1215                if non_err == 0 {
1216                    None
1217                } else {
1218                    Some(Arc::new(a))
1219                }
1220            };
1221
1222            (
1223                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
1224            )
1225        };
1226
1227        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
1228            return false;
1229        };
1230
1231        ch_mut.type_tag = tags;
1232        ch_mut.numbers = numbers;
1233        ch_mut.booleans = booleans;
1234        ch_mut.text = text;
1235        ch_mut.errors = errors;
1236        ch_mut.overlay.clear();
1237        ch_mut.lowered_text = OnceCell::new();
1238        ch_mut.meta.len = len;
1239        ch_mut.meta.non_null_num = non_num;
1240        ch_mut.meta.non_null_bool = non_bool;
1241        ch_mut.meta.non_null_text = non_text;
1242        ch_mut.meta.non_null_err = non_err;
1243        true
1244    }
1245
1246    /// Insert `count` rows before absolute 0-based row `before`.
1247    pub fn insert_rows(&mut self, before: usize, count: usize) {
1248        if count == 0 {
1249            return;
1250        }
1251
1252        let total_rows = self.nrows as usize;
1253        if total_rows == 0 {
1254            self.nrows = count as u32;
1255            if self.nrows > 0 && self.chunk_starts.is_empty() {
1256                self.chunk_starts.push(0);
1257            }
1258            return;
1259        }
1260
1261        // Ensure a valid chunk map for non-empty sheets.
1262        if self.chunk_starts.is_empty() {
1263            self.chunk_starts.push(0);
1264        }
1265
1266        // "Dense" mode: every column has every chunk (legacy invariant).
1267        let dense_aligned = self
1268            .columns
1269            .iter()
1270            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
1271
1272        let insert_at = before.min(total_rows);
1273        let (split_idx, split_off) = if insert_at == total_rows {
1274            // Append at end: split after last chunk.
1275            let last_idx = self.chunk_starts.len() - 1;
1276            let last_start = self.chunk_starts[last_idx];
1277            let last_len = total_rows.saturating_sub(last_start);
1278            (last_idx, last_len)
1279        } else {
1280            self.chunk_of_row(insert_at).unwrap_or((0, 0))
1281        };
1282
1283        if dense_aligned {
1284            // Rebuild chunks for each column (including inserted empty chunk) and recompute starts.
1285            for col in &mut self.columns {
1286                let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 2);
1287                for i in 0..col.chunks.len() {
1288                    if i != split_idx {
1289                        new_chunks.push(col.chunks[i].clone());
1290                    } else {
1291                        let orig = &col.chunks[i];
1292                        let len = orig.type_tag.len();
1293                        if split_off > 0 {
1294                            new_chunks.push(Self::slice_chunk(orig, 0, split_off));
1295                        }
1296                        new_chunks.push(Self::make_empty_chunk(count));
1297                        if split_off < len {
1298                            new_chunks.push(Self::slice_chunk(orig, split_off, len - split_off));
1299                        }
1300                    }
1301                }
1302                col.chunks = new_chunks;
1303                col.sparse_chunks.clear();
1304            }
1305            self.nrows = (total_rows + count) as u32;
1306            self.recompute_chunk_starts();
1307            return;
1308        }
1309
1310        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
1311        #[derive(Clone, Copy)]
1312        enum PlanItem {
1313            Slice {
1314                old_idx: usize,
1315                off: usize,
1316                len: usize,
1317            },
1318            Empty {
1319                len: usize,
1320            },
1321        }
1322
1323        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len() + 2);
1324        for old_idx in 0..self.chunk_starts.len() {
1325            let ch_start = self.chunk_starts[old_idx];
1326            let ch_end = self
1327                .chunk_starts
1328                .get(old_idx + 1)
1329                .copied()
1330                .unwrap_or(total_rows);
1331            let ch_len = ch_end.saturating_sub(ch_start);
1332            if ch_len == 0 {
1333                continue;
1334            }
1335
1336            if old_idx != split_idx {
1337                plan.push(PlanItem::Slice {
1338                    old_idx,
1339                    off: 0,
1340                    len: ch_len,
1341                });
1342                continue;
1343            }
1344
1345            let left_len = split_off.min(ch_len);
1346            let right_len = ch_len.saturating_sub(left_len);
1347            if left_len > 0 {
1348                plan.push(PlanItem::Slice {
1349                    old_idx,
1350                    off: 0,
1351                    len: left_len,
1352                });
1353            }
1354            plan.push(PlanItem::Empty { len: count });
1355            if right_len > 0 {
1356                plan.push(PlanItem::Slice {
1357                    old_idx,
1358                    off: left_len,
1359                    len: right_len,
1360                });
1361            }
1362        }
1363
1364        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
1365        let mut cur = 0usize;
1366        for item in &plan {
1367            let len = match *item {
1368                PlanItem::Slice { len, .. } => len,
1369                PlanItem::Empty { len } => len,
1370            };
1371            if len == 0 {
1372                continue;
1373            }
1374            new_starts.push(cur);
1375            cur = cur.saturating_add(len);
1376        }
1377
1378        debug_assert_eq!(cur, total_rows.saturating_add(count));
1379
1380        // Update sheet row layout first.
1381        self.nrows = (total_rows + count) as u32;
1382        self.chunk_starts = new_starts;
1383
1384        // Rebuild stored chunks per column using the plan.
1385        for col in &mut self.columns {
1386            let old_dense = std::mem::take(&mut col.chunks);
1387            let old_sparse = std::mem::take(&mut col.sparse_chunks);
1388            let get_old = |idx: usize| -> Option<&ColumnChunk> {
1389                if idx < old_dense.len() {
1390                    Some(&old_dense[idx])
1391                } else {
1392                    old_sparse.get(&idx)
1393                }
1394            };
1395
1396            let mut dense: Vec<ColumnChunk> = Vec::new();
1397            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
1398            let mut dense_prefix = true;
1399
1400            for (new_idx, item) in plan.iter().enumerate() {
1401                let produced: Option<ColumnChunk> = match *item {
1402                    PlanItem::Empty { .. } => None,
1403                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
1404                        Some(orig) => {
1405                            if off == 0 && len == orig.type_tag.len() {
1406                                Some(orig.clone())
1407                            } else {
1408                                Some(Self::slice_chunk(orig, off, len))
1409                            }
1410                        }
1411                        None => None,
1412                    },
1413                };
1414
1415                if let Some(ch) = produced {
1416                    if dense_prefix && new_idx == dense.len() {
1417                        dense.push(ch);
1418                    } else {
1419                        sparse.insert(new_idx, ch);
1420                        dense_prefix = false;
1421                    }
1422                } else if dense_prefix && new_idx == dense.len() {
1423                    dense_prefix = false;
1424                }
1425            }
1426
1427            col.chunks = dense;
1428            col.sparse_chunks = sparse;
1429        }
1430    }
1431
1432    /// Delete `count` rows starting from absolute 0-based row `start`.
1433    pub fn delete_rows(&mut self, start: usize, count: usize) {
1434        if count == 0 || self.nrows == 0 {
1435            return;
1436        }
1437
1438        let total_rows = self.nrows as usize;
1439        if start >= total_rows {
1440            return;
1441        }
1442        let end = (start + count).min(total_rows);
1443        let del_len = end.saturating_sub(start);
1444        if del_len == 0 {
1445            return;
1446        }
1447
1448        // Ensure a valid chunk map for non-empty sheets.
1449        if total_rows > 0 && self.chunk_starts.is_empty() {
1450            self.chunk_starts.push(0);
1451        }
1452
1453        // "Dense" mode: every column has every chunk (legacy invariant).
1454        let dense_aligned = self
1455            .columns
1456            .iter()
1457            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
1458
1459        if dense_aligned {
1460            // Dense rebuild by slicing out the deleted window.
1461            for col in &mut self.columns {
1462                let mut new_chunks: Vec<ColumnChunk> = Vec::new();
1463                let mut cur_start = 0usize;
1464                for ch in &col.chunks {
1465                    let len = ch.type_tag.len();
1466                    let ch_end = cur_start + len;
1467                    // No overlap
1468                    if ch_end <= start || cur_start >= end {
1469                        new_chunks.push(ch.clone());
1470                    } else {
1471                        // Overlap exists
1472                        let del_start = start.max(cur_start);
1473                        let del_end = end.min(ch_end);
1474                        let left_len = del_start.saturating_sub(cur_start);
1475                        let right_len = ch_end.saturating_sub(del_end);
1476                        if left_len > 0 {
1477                            new_chunks.push(Self::slice_chunk(ch, 0, left_len));
1478                        }
1479                        if right_len > 0 {
1480                            let off = len - right_len;
1481                            new_chunks.push(Self::slice_chunk(ch, off, right_len));
1482                        }
1483                    }
1484                    cur_start = ch_end;
1485                }
1486                col.chunks = new_chunks;
1487                col.sparse_chunks.clear();
1488            }
1489            self.nrows = (total_rows - del_len) as u32;
1490            self.recompute_chunk_starts();
1491            return;
1492        }
1493
1494        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
1495        #[derive(Clone, Copy)]
1496        enum PlanItem {
1497            Slice {
1498                old_idx: usize,
1499                off: usize,
1500                len: usize,
1501            },
1502        }
1503
1504        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len());
1505        for old_idx in 0..self.chunk_starts.len() {
1506            let ch_start = self.chunk_starts[old_idx];
1507            let ch_end = self
1508                .chunk_starts
1509                .get(old_idx + 1)
1510                .copied()
1511                .unwrap_or(total_rows);
1512            let ch_len = ch_end.saturating_sub(ch_start);
1513            if ch_len == 0 {
1514                continue;
1515            }
1516
1517            // No overlap
1518            if ch_end <= start || ch_start >= end {
1519                plan.push(PlanItem::Slice {
1520                    old_idx,
1521                    off: 0,
1522                    len: ch_len,
1523                });
1524                continue;
1525            }
1526
1527            // Left remainder
1528            if start > ch_start {
1529                let left_end = start.min(ch_end);
1530                let left_len = left_end.saturating_sub(ch_start);
1531                if left_len > 0 {
1532                    plan.push(PlanItem::Slice {
1533                        old_idx,
1534                        off: 0,
1535                        len: left_len,
1536                    });
1537                }
1538            }
1539
1540            // Right remainder
1541            if end < ch_end {
1542                let right_off = end.saturating_sub(ch_start);
1543                let right_len = ch_end.saturating_sub(end);
1544                if right_len > 0 {
1545                    plan.push(PlanItem::Slice {
1546                        old_idx,
1547                        off: right_off,
1548                        len: right_len,
1549                    });
1550                }
1551            }
1552        }
1553
1554        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
1555        let mut cur = 0usize;
1556        for item in &plan {
1557            let len = match *item {
1558                PlanItem::Slice { len, .. } => len,
1559            };
1560            if len == 0 {
1561                continue;
1562            }
1563            new_starts.push(cur);
1564            cur = cur.saturating_add(len);
1565        }
1566
1567        debug_assert_eq!(cur, total_rows.saturating_sub(del_len));
1568
1569        // Update sheet row layout first.
1570        self.nrows = (total_rows - del_len) as u32;
1571        self.chunk_starts = new_starts;
1572
1573        // Rebuild stored chunks per column using the plan.
1574        for col in &mut self.columns {
1575            let old_dense = std::mem::take(&mut col.chunks);
1576            let old_sparse = std::mem::take(&mut col.sparse_chunks);
1577            let get_old = |idx: usize| -> Option<&ColumnChunk> {
1578                if idx < old_dense.len() {
1579                    Some(&old_dense[idx])
1580                } else {
1581                    old_sparse.get(&idx)
1582                }
1583            };
1584
1585            let mut dense: Vec<ColumnChunk> = Vec::new();
1586            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
1587            let mut dense_prefix = true;
1588
1589            for (new_idx, item) in plan.iter().enumerate() {
1590                let produced: Option<ColumnChunk> = match *item {
1591                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
1592                        Some(orig) => {
1593                            if off == 0 && len == orig.type_tag.len() {
1594                                Some(orig.clone())
1595                            } else {
1596                                Some(Self::slice_chunk(orig, off, len))
1597                            }
1598                        }
1599                        None => None,
1600                    },
1601                };
1602
1603                if let Some(ch) = produced {
1604                    if dense_prefix && new_idx == dense.len() {
1605                        dense.push(ch);
1606                    } else {
1607                        sparse.insert(new_idx, ch);
1608                        dense_prefix = false;
1609                    }
1610                } else if dense_prefix && new_idx == dense.len() {
1611                    dense_prefix = false;
1612                }
1613            }
1614
1615            col.chunks = dense;
1616            col.sparse_chunks = sparse;
1617        }
1618    }
1619
1620    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
1621    pub fn insert_columns(&mut self, before: usize, count: usize) {
1622        if count == 0 {
1623            return;
1624        }
1625        // Determine chunk schema from first column if present
1626        let empty_col = |lens: &[usize]| -> ArrowColumn {
1627            let mut chunks = Vec::with_capacity(lens.len());
1628            for &l in lens {
1629                chunks.push(Self::make_empty_chunk(l));
1630            }
1631            ArrowColumn {
1632                chunks,
1633                sparse_chunks: FxHashMap::default(),
1634                index: 0,
1635            }
1636        };
1637        let dense_aligned = !self.columns.is_empty()
1638            && self
1639                .columns
1640                .iter()
1641                .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
1642
1643        let lens: Vec<usize> = if dense_aligned {
1644            self.columns[0]
1645                .chunks
1646                .iter()
1647                .map(|c| c.type_tag.len())
1648                .collect()
1649        } else if self.columns.is_empty() {
1650            // No columns: single chunk matching nrows if any
1651            if self.nrows > 0 {
1652                vec![self.nrows as usize]
1653            } else {
1654                Vec::new()
1655            }
1656        } else {
1657            // Sparse sheet: keep inserted columns cheap by materializing no chunks.
1658            Vec::new()
1659        };
1660        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
1661        let before_idx = before.min(self.columns.len());
1662        for (i, col) in self.columns.iter_mut().enumerate() {
1663            if i == before_idx {
1664                for _ in 0..count {
1665                    cols_new.push(empty_col(&lens));
1666                }
1667            }
1668            cols_new.push(col.clone());
1669        }
1670        if before_idx == self.columns.len() {
1671            for _ in 0..count {
1672                cols_new.push(empty_col(&lens));
1673            }
1674        }
1675        // Fix column indices
1676        for (idx, col) in cols_new.iter_mut().enumerate() {
1677            col.index = idx as u32;
1678        }
1679        self.columns = cols_new;
1680        // chunk_starts unchanged; lens were matched
1681    }
1682
1683    /// Delete `count` columns starting at absolute 0-based column `start`.
1684    pub fn delete_columns(&mut self, start: usize, count: usize) {
1685        if count == 0 || self.columns.is_empty() {
1686            return;
1687        }
1688        let end = (start + count).min(self.columns.len());
1689        if start >= end {
1690            return;
1691        }
1692        self.columns.drain(start..end);
1693        for (idx, col) in self.columns.iter_mut().enumerate() {
1694            col.index = idx as u32;
1695        }
1696    }
1697}
1698
1699#[derive(Debug, Clone, Copy)]
1700pub struct ColumnShape {
1701    pub index: u32,
1702    pub chunks: usize,
1703    pub rows: usize,
1704    pub has_num: bool,
1705    pub has_bool: bool,
1706    pub has_text: bool,
1707    pub has_err: bool,
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use super::*;
1713    use arrow_array::Array;
1714    use arrow_schema::DataType;
1715
1716    #[test]
1717    fn ingest_mixed_rows_into_lanes_and_tags() {
1718        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
1719        let data = vec![
1720            LiteralValue::Number(42.5),                   // Number
1721            LiteralValue::Empty,                          // Empty
1722            LiteralValue::Text(String::new()),            // Empty text (Text lane)
1723            LiteralValue::Boolean(true),                  // Boolean
1724            LiteralValue::Error(ExcelError::new_value()), // Error
1725        ];
1726        for v in &data {
1727            b.append_row(std::slice::from_ref(v)).unwrap();
1728        }
1729        let sheet = b.finish();
1730        assert_eq!(sheet.nrows, 5);
1731        assert_eq!(sheet.columns.len(), 1);
1732        assert_eq!(sheet.columns[0].chunks.len(), 1);
1733        let ch = &sheet.columns[0].chunks[0];
1734
1735        // Type tags
1736        let tags = ch.type_tag.values();
1737        assert_eq!(tags.len(), 5);
1738        assert_eq!(tags[0], TypeTag::Number as u8);
1739        assert_eq!(tags[1], TypeTag::Empty as u8);
1740        assert_eq!(tags[2], TypeTag::Text as u8);
1741        assert_eq!(tags[3], TypeTag::Boolean as u8);
1742        assert_eq!(tags[4], TypeTag::Error as u8);
1743
1744        // Numbers lane validity
1745        let nums = ch.numbers.as_ref().unwrap();
1746        assert_eq!(nums.len(), 5);
1747        assert_eq!(nums.null_count(), 4);
1748        assert!(nums.is_valid(0));
1749
1750        // Booleans lane validity
1751        let bools = ch.booleans.as_ref().unwrap();
1752        assert_eq!(bools.len(), 5);
1753        assert_eq!(bools.null_count(), 4);
1754        assert!(bools.is_valid(3));
1755
1756        // Text lane validity
1757        let txt = ch.text.as_ref().unwrap();
1758        assert_eq!(txt.len(), 5);
1759        assert_eq!(txt.null_count(), 4);
1760        assert!(txt.is_valid(2)); // ""
1761
1762        // Errors lane
1763        let errs = ch.errors.as_ref().unwrap();
1764        assert_eq!(errs.len(), 5);
1765        assert_eq!(errs.null_count(), 4);
1766        assert!(errs.is_valid(4));
1767    }
1768
1769    #[test]
1770    fn range_view_get_cell_and_padding() {
1771        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
1772        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
1773            .unwrap();
1774        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
1775            .unwrap();
1776        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
1777            .unwrap();
1778        let sheet = b.finish();
1779        let rv = sheet.range_view(0, 0, 2, 1);
1780        assert_eq!(rv.dims(), (3, 2));
1781        // Inside
1782        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
1783        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
1784        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
1785        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
1786        // OOB padding
1787        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
1788        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
1789
1790        // Numbers slices should produce one 2-row and one 1-row segment
1791        let nums: Vec<_> = rv.numbers_slices().map(|r| r.unwrap()).collect();
1792        assert_eq!(nums.len(), 2);
1793        assert_eq!(nums[0].0, 0);
1794        assert_eq!(nums[0].1, 2);
1795        assert_eq!(nums[1].0, 2);
1796        assert_eq!(nums[1].1, 1);
1797    }
1798
1799    #[test]
1800    fn overlay_precedence_user_over_computed() {
1801        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
1802        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
1803        b.append_row(&[LiteralValue::Empty]).unwrap();
1804        b.append_row(&[LiteralValue::Empty]).unwrap();
1805        let mut sheet = b.finish();
1806
1807        let (ch_i, off) = sheet.chunk_of_row(0).unwrap();
1808        sheet.columns[0].chunks[ch_i]
1809            .computed_overlay
1810            .set(off, OverlayValue::Number(2.0));
1811
1812        let rv0 = sheet.range_view(0, 0, 0, 0);
1813        assert_eq!(rv0.get_cell(0, 0), LiteralValue::Number(2.0));
1814        let nums0: Vec<_> = rv0.numbers_slices().map(|r| r.unwrap()).collect();
1815        assert_eq!(nums0.len(), 1);
1816        assert_eq!(nums0[0].2[0].value(0), 2.0);
1817
1818        sheet.columns[0].chunks[ch_i]
1819            .overlay
1820            .set(off, OverlayValue::Number(3.0));
1821
1822        let rv1 = sheet.range_view(0, 0, 0, 0);
1823        assert_eq!(rv1.get_cell(0, 0), LiteralValue::Number(3.0));
1824        let nums1: Vec<_> = rv1.numbers_slices().map(|r| r.unwrap()).collect();
1825        assert_eq!(nums1.len(), 1);
1826        assert_eq!(nums1[0].2[0].value(0), 3.0);
1827    }
1828
1829    #[test]
1830    fn row_chunk_slices_shape() {
1831        // chunk_rows=2 leads to two slices for 3 rows
1832        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
1833        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
1834            .unwrap();
1835        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
1836            .unwrap();
1837        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
1838            .unwrap();
1839        let sheet = b.finish();
1840        let rv = sheet.range_view(0, 0, 2, 1);
1841        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
1842        assert_eq!(slices.len(), 2);
1843        assert_eq!(slices[0].row_start, 0);
1844        assert_eq!(slices[0].row_len, 2);
1845        assert_eq!(slices[0].cols.len(), 2);
1846        assert_eq!(slices[1].row_start, 2);
1847        assert_eq!(slices[1].row_len, 1);
1848        assert_eq!(slices[1].cols.len(), 2);
1849    }
1850
1851    #[test]
1852    fn oob_columns_are_padded() {
1853        // Build with 2 columns; request 3 columns (ec beyond last col)
1854        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
1855        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
1856            .unwrap();
1857        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
1858            .unwrap();
1859        let sheet = b.finish();
1860        // Request cols [0..=2] → 3 columns with padding
1861        let rv = sheet.range_view(0, 0, 1, 2);
1862        assert_eq!(rv.dims(), (2, 3));
1863        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
1864        assert!(!slices.is_empty());
1865        for cs in &slices {
1866            assert_eq!(cs.cols.len(), 3);
1867        }
1868        // Also validate typed slices return 3 entries per segment
1869        for res in rv.numbers_slices() {
1870            let (_rs, _rl, cols) = res.unwrap();
1871            assert_eq!(cols.len(), 3);
1872        }
1873        for res in rv.booleans_slices() {
1874            let (_rs, _rl, cols) = res.unwrap();
1875            assert_eq!(cols.len(), 3);
1876        }
1877        for res in rv.text_slices() {
1878            let (_rs, _rl, cols) = res.unwrap();
1879            assert_eq!(cols.len(), 3);
1880        }
1881        for res in rv.errors_slices() {
1882            let (_rs, _rl, cols) = res.unwrap();
1883            assert_eq!(cols.len(), 3);
1884        }
1885        for res in rv.lowered_text_slices() {
1886            let (_rs, _rl, cols) = res.unwrap();
1887            assert_eq!(cols.len(), 3);
1888        }
1889    }
1890
1891    #[test]
1892    fn reversed_range_is_empty() {
1893        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
1894        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
1895        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
1896        let sheet = b.finish();
1897        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
1898        assert_eq!(rv.dims(), (0, 0));
1899        assert!(rv.iter_row_chunks().next().is_none());
1900        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
1901    }
1902
1903    #[test]
1904    fn chunk_alignment_invariant() {
1905        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
1906        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
1907        for r in 0..5 {
1908            b.append_row(&[
1909                LiteralValue::Number(r as f64),
1910                LiteralValue::Text(format!("{r}")),
1911                if r % 2 == 0 {
1912                    LiteralValue::Empty
1913                } else {
1914                    LiteralValue::Boolean(true)
1915                },
1916            ])
1917            .unwrap();
1918        }
1919        let sheet = b.finish();
1920        // chunk_starts should be [0,2,4]
1921        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
1922        // All columns must share per-chunk lengths equal to [2,2,1]
1923        let lens0: Vec<usize> = sheet.columns[0]
1924            .chunks
1925            .iter()
1926            .map(|ch| ch.type_tag.len())
1927            .collect();
1928        for col in &sheet.columns[1..] {
1929            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
1930            assert_eq!(lens, lens0);
1931        }
1932    }
1933
1934    #[test]
1935    fn chunking_splits_rows() {
1936        // Two columns, chunk size 2 → expect two chunks
1937        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
1938        let rows = vec![
1939            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
1940            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
1941            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
1942        ];
1943        for r in rows {
1944            b.append_row(&r).unwrap();
1945        }
1946        let sheet = b.finish();
1947        assert_eq!(sheet.columns[0].chunks.len(), 2);
1948        assert_eq!(sheet.columns[1].chunks.len(), 2);
1949        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
1950        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
1951    }
1952
1953    #[test]
1954    fn pending_is_not_error() {
1955        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
1956        b.append_row(&[LiteralValue::Pending]).unwrap();
1957        let sheet = b.finish();
1958        let ch = &sheet.columns[0].chunks[0];
1959        // tag is Pending
1960        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
1961        // errors lane is effectively null
1962        let errs = ch.errors_or_null();
1963        assert_eq!(errs.null_count(), 1);
1964    }
1965
1966    #[test]
1967    fn all_null_numeric_lane_uses_null_array() {
1968        // Only text values in first column → numbers lane should be all null with correct dtype
1969        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
1970        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
1971        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
1972        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
1973        let sheet = b.finish();
1974        let ch = &sheet.columns[0].chunks[0];
1975        let nums = ch.numbers_or_null();
1976        assert_eq!(nums.len(), 3);
1977        assert_eq!(nums.null_count(), 3);
1978        assert_eq!(nums.data_type(), &DataType::Float64);
1979    }
1980
1981    #[test]
1982    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
1983        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
1984        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
1985        for _ in 0..10 {
1986            b.append_row(&[LiteralValue::Empty]).unwrap();
1987        }
1988        let mut sheet = b.finish();
1989        // Add overlays at row 3 and row 4
1990        {
1991            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
1992            sheet.columns[0].chunks[c0]
1993                .overlay
1994                .set(o0, OverlayValue::Number(30.0));
1995            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
1996            sheet.columns[0].chunks[c1]
1997                .overlay
1998                .set(o1, OverlayValue::Number(40.0));
1999        }
2000        // Insert 2 rows before row 4 (at chunk boundary)
2001        sheet.insert_rows(4, 2);
2002        assert_eq!(sheet.nrows, 12);
2003        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
2004        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2005        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
2006        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
2007        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
2008
2009        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
2010        sheet.delete_rows(3, 3);
2011        assert_eq!(sheet.nrows, 9);
2012        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2013        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
2014        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
2015        let lens0: Vec<usize> = sheet.columns[0]
2016            .chunks
2017            .iter()
2018            .map(|ch| ch.type_tag.len())
2019            .collect();
2020        for col in &sheet.columns {
2021            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2022            assert_eq!(lens, lens0);
2023        }
2024        // chunk_starts should be monotonic and final chunk end == nrows
2025        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2026        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2027        let last_len = sheet.columns[0]
2028            .chunks
2029            .last()
2030            .map(|c| c.type_tag.len())
2031            .unwrap_or(0);
2032        assert_eq!(last_start + last_len, sheet.nrows as usize);
2033    }
2034
2035    #[test]
2036    fn column_insert_delete_retains_chunk_alignment() {
2037        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2038        for _ in 0..5 {
2039            b.append_row(&[
2040                LiteralValue::Empty,
2041                LiteralValue::Empty,
2042                LiteralValue::Empty,
2043            ])
2044            .unwrap();
2045        }
2046        let mut sheet = b.finish();
2047        // Record reference chunk lengths of first column
2048        let ref_lens: Vec<usize> = sheet.columns[0]
2049            .chunks
2050            .iter()
2051            .map(|ch| ch.type_tag.len())
2052            .collect();
2053        // Insert 2 columns before index 1
2054        sheet.insert_columns(1, 2);
2055        assert_eq!(sheet.columns.len(), 5);
2056        for col in &sheet.columns {
2057            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2058            assert_eq!(lens, ref_lens);
2059        }
2060        let starts_before = sheet.chunk_starts.clone();
2061        // Delete 2 columns starting at index 2 → back to 3 columns
2062        sheet.delete_columns(2, 2);
2063        assert_eq!(sheet.columns.len(), 3);
2064        for col in &sheet.columns {
2065            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2066            assert_eq!(lens, ref_lens);
2067        }
2068        // chunk_starts unchanged by column operations
2069        assert_eq!(sheet.chunk_starts, starts_before);
2070    }
2071
2072    #[test]
2073    fn multiple_adjacent_row_ops_overlay_mixed_types() {
2074        use formualizer_common::ExcelErrorKind;
2075        // Two columns to ensure alignment preserved across columns
2076        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
2077        for _ in 0..9 {
2078            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2079                .unwrap();
2080        }
2081        let mut sheet = b.finish();
2082        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
2083        // Column 0 only
2084        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
2085            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2086            sh.columns[0].chunks[ch_i].overlay.set(off, ov);
2087        };
2088        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
2089        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
2090        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
2091        set_ov(
2092            &mut sheet,
2093            6,
2094            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
2095        );
2096        set_ov(&mut sheet, 8, OverlayValue::Empty);
2097
2098        // Insert 1 row before index 3
2099        sheet.insert_rows(3, 1);
2100        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
2101        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2102        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
2103        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
2104        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
2105        match av1.get_cell(7, 0) {
2106            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2107            other => panic!("expected error at row 7, got {other:?}"),
2108        }
2109        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
2110
2111        // Insert 2 rows before index 4 (adjacent to previous region)
2112        sheet.insert_rows(4, 2);
2113        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
2114        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2115        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
2116        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
2117        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
2118        match av2.get_cell(9, 0) {
2119            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2120            other => panic!("expected error at row 9, got {other:?}"),
2121        }
2122        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
2123
2124        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
2125        sheet.delete_rows(6, 2);
2126        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
2127        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
2128        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
2129        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
2130        match av3.get_cell(7, 0) {
2131            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
2132            other => panic!("expected error at row 8, got {other:?}"),
2133        }
2134        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
2135
2136        // Alignment checks
2137        let lens0: Vec<usize> = sheet.columns[0]
2138            .chunks
2139            .iter()
2140            .map(|ch| ch.type_tag.len())
2141            .collect();
2142        for col in &sheet.columns {
2143            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2144            assert_eq!(lens, lens0);
2145        }
2146        // chunk_starts monotonically increasing and cover nrows
2147        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
2148        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
2149        let last_len = sheet.columns[0]
2150            .chunks
2151            .last()
2152            .map(|c| c.type_tag.len())
2153            .unwrap_or(0);
2154        assert_eq!(last_start + last_len, sheet.nrows as usize);
2155    }
2156
2157    #[test]
2158    fn multiple_adjacent_column_ops_alignment() {
2159        // Start with 2 columns, chunk_rows=2, rows=5
2160        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
2161        for _ in 0..5 {
2162            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
2163                .unwrap();
2164        }
2165        let mut sheet = b.finish();
2166        let ref_lens: Vec<usize> = sheet.columns[0]
2167            .chunks
2168            .iter()
2169            .map(|ch| ch.type_tag.len())
2170            .collect();
2171        // Insert 1 at start, then 2 at index 2 → columns = 5
2172        sheet.insert_columns(0, 1);
2173        sheet.insert_columns(2, 2);
2174        assert_eq!(sheet.columns.len(), 5);
2175        for col in &sheet.columns {
2176            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2177            assert_eq!(lens, ref_lens);
2178        }
2179        let starts_before = sheet.chunk_starts.clone();
2180        // Delete 1 at index 1, then 2 at the end if available
2181        sheet.delete_columns(1, 1);
2182        let remain = sheet.columns.len();
2183        if remain >= 3 {
2184            sheet.delete_columns(remain - 2, 2);
2185        }
2186        for col in &sheet.columns {
2187            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2188            assert_eq!(lens, ref_lens);
2189        }
2190        assert_eq!(sheet.chunk_starts, starts_before);
2191    }
2192
2193    #[test]
2194    fn overlays_on_multiple_columns_row_col_ops() {
2195        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
2196        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
2197        for _ in 0..6 {
2198            b.append_row(&[
2199                LiteralValue::Empty,
2200                LiteralValue::Empty,
2201                LiteralValue::Empty,
2202            ])
2203            .unwrap();
2204        }
2205        let mut sheet = b.finish();
2206        // Overlays at row2 and row3 across columns with different types
2207        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
2208            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
2209            sh.columns[col].chunks[ch_i].overlay.set(off, ov);
2210        };
2211        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
2212        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
2213        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
2214        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
2215        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
2216        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
2217
2218        // Insert a row at boundary (before row index 3)
2219        sheet.insert_rows(3, 1);
2220        // Now original row>=3 shift down by 1
2221        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
2222        // Row 2 values unchanged
2223        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
2224        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
2225        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
2226        // Row 3 became Empty (inserted)
2227        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
2228        // Row 4 holds old row 3 overlays
2229        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
2230        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
2231        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
2232
2233        // Delete column 1 (middle), values shift left
2234        sheet.delete_columns(1, 1);
2235        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
2236        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
2237        // Column 1 now was old column 2
2238        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
2239        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
2240        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
2241
2242        // Alignment preserved
2243        let lens0: Vec<usize> = sheet.columns[0]
2244            .chunks
2245            .iter()
2246            .map(|ch| ch.type_tag.len())
2247            .collect();
2248        for col in &sheet.columns {
2249            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
2250            assert_eq!(lens, lens0);
2251        }
2252    }
2253
2254    #[test]
2255    fn effective_slices_overlay_precedence_numbers_text() {
2256        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
2257        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2258        for i in 0..6 {
2259            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2260                .unwrap();
2261        }
2262        let mut sheet = b.finish();
2263        // Overlays: row1 -> Text("X"), row4 -> Number(99)
2264        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2265        sheet.columns[0].chunks[c1]
2266            .overlay
2267            .set(o1, OverlayValue::Text(Arc::from("X")));
2268        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2269        sheet.columns[0].chunks[c4]
2270            .overlay
2271            .set(o4, OverlayValue::Number(99.0));
2272
2273        let av = sheet.range_view(0, 0, 5, 0);
2274        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
2275        let mut numeric: Vec<Option<f64>> = vec![None; 6];
2276        for res in av.numbers_slices() {
2277            let (row_start, row_len, cols) = res.unwrap();
2278            let a = &cols[0];
2279            for i in 0..row_len {
2280                let idx = row_start + i;
2281                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2282            }
2283        }
2284        assert_eq!(numeric[0], Some(1.0));
2285        assert_eq!(numeric[1], None); // overshadowed by text overlay
2286        assert_eq!(numeric[2], Some(3.0));
2287        assert_eq!(numeric[3], Some(4.0));
2288        assert_eq!(numeric[4], Some(99.0));
2289        assert_eq!(numeric[5], Some(6.0));
2290
2291        // Validate text_slices: row1 has "X", others null
2292        let mut texts: Vec<Option<String>> = vec![None; 6];
2293        for res in av.text_slices() {
2294            let (row_start, row_len, cols) = res.unwrap();
2295            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
2296            for i in 0..row_len {
2297                let idx = row_start + i;
2298                texts[idx] = if a.is_null(i) {
2299                    None
2300                } else {
2301                    Some(a.value(i).to_string())
2302                };
2303            }
2304        }
2305        assert_eq!(texts[1].as_deref(), Some("X"));
2306        assert!(texts[0].is_none());
2307        assert!(texts[2].is_none());
2308        assert!(texts[3].is_none());
2309        assert!(texts[4].is_none());
2310        assert!(texts[5].is_none());
2311    }
2312
2313    #[test]
2314    fn effective_slices_overlay_precedence_booleans() {
2315        // Base booleans over 1 column; overlays include boolean and non-boolean types.
2316        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
2317        for i in 0..6 {
2318            let v = if i % 2 == 0 {
2319                LiteralValue::Boolean(true)
2320            } else {
2321                LiteralValue::Boolean(false)
2322            };
2323            b.append_row(&[v]).unwrap();
2324        }
2325        let mut sheet = b.finish();
2326        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
2327        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
2328        sheet.columns[0].chunks[c1]
2329            .overlay
2330            .set(o1, OverlayValue::Boolean(true));
2331        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
2332        sheet.columns[0].chunks[c2]
2333            .overlay
2334            .set(o2, OverlayValue::Text(Arc::from("T")));
2335
2336        let av = sheet.range_view(0, 0, 5, 0);
2337        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
2338        let mut bools: Vec<Option<bool>> = vec![None; 6];
2339        for res in av.booleans_slices() {
2340            let (row_start, row_len, cols) = res.unwrap();
2341            let a = &cols[0];
2342            for i in 0..row_len {
2343                let idx = row_start + i;
2344                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2345            }
2346        }
2347        assert_eq!(bools[0], Some(true));
2348        assert_eq!(bools[1], Some(true)); // overlay to true
2349        assert_eq!(bools[2], None); // overshadowed by text overlay
2350        // spot-check others remain base
2351        assert_eq!(bools[3], Some(false));
2352    }
2353
2354    #[test]
2355    fn effective_slices_overlay_precedence_errors() {
2356        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
2357        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
2358        for i in 0..6 {
2359            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
2360                .unwrap();
2361        }
2362        let mut sheet = b.finish();
2363        // Overlay error at row 4
2364        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
2365        sheet.columns[0].chunks[c4]
2366            .overlay
2367            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
2368
2369        let av = sheet.range_view(0, 0, 5, 0);
2370        let mut errs: Vec<Option<u8>> = vec![None; 6];
2371        for res in av.errors_slices() {
2372            let (row_start, row_len, cols) = res.unwrap();
2373            let a = &cols[0];
2374            for i in 0..row_len {
2375                let idx = row_start + i;
2376                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
2377            }
2378        }
2379        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
2380        assert!(errs[3].is_none());
2381    }
2382}