Skip to main content

formualizer_eval/arrow_store/
mod.rs

1use arrow_array::Array;
2use arrow_array::new_null_array;
3use arrow_schema::DataType;
4use chrono::Timelike;
5use std::sync::Arc;
6
7use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
8use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
9use once_cell::sync::OnceCell;
10
11use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
12use rustc_hash::FxHashMap;
13use std::collections::{BTreeMap, HashMap};
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane, nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91    // Phase 0/1: separate computed overlay (formula/spill outputs)
92    pub computed_overlay: Overlay,
93}
94
95impl ColumnChunk {
96    #[inline]
97    pub fn len(&self) -> usize {
98        self.type_tag.len()
99    }
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104    #[inline]
105    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
106        if let Some(a) = &self.numbers {
107            return a.clone();
108        }
109        self.lazy_null_numbers
110            .get_or_init(|| {
111                let arr = new_null_array(&DataType::Float64, self.len());
112                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
113            })
114            .clone()
115    }
116    #[inline]
117    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
118        if let Some(a) = &self.booleans {
119            return a.clone();
120        }
121        self.lazy_null_booleans
122            .get_or_init(|| {
123                let arr = new_null_array(&DataType::Boolean, self.len());
124                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
125            })
126            .clone()
127    }
128    #[inline]
129    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
130        if let Some(a) = &self.errors {
131            return a.clone();
132        }
133        self.lazy_null_errors
134            .get_or_init(|| {
135                let arr = new_null_array(&DataType::UInt8, self.len());
136                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
137            })
138            .clone()
139    }
140    #[inline]
141    pub fn text_or_null(&self) -> ArrayRef {
142        if let Some(a) = &self.text {
143            return a.clone();
144        }
145        self.lazy_null_text
146            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
147            .clone()
148    }
149
150    /// Lowercased text lane, with nulls preserved. Cached per chunk.
151    pub fn text_lower_or_null(&self) -> ArrayRef {
152        if let Some(a) = self.lowered_text.get() {
153            return a.clone();
154        }
155        // Lowercase when text present; else return null Utf8
156        let out: ArrayRef = if let Some(txt) = &self.text {
157            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
158            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
159            for i in 0..sa.len() {
160                if sa.is_null(i) {
161                    b.append_null();
162                } else {
163                    b.append_value(sa.value(i).to_lowercase());
164                }
165            }
166            let lowered = b.finish();
167            Arc::new(lowered)
168        } else {
169            new_null_array(&DataType::Utf8, self.len())
170        };
171        self.lowered_text.get_or_init(|| out.clone());
172        out
173    }
174
175    /// Grow this chunk's logical length to `new_len` (padding with empty/null values).
176    ///
177    /// This is used to keep already-materialized chunks consistent when `ArrowSheet::nrows`
178    /// grows incrementally inside the current last chunk.
179    pub fn grow_len_to(&mut self, new_len: usize) {
180        let old_len = self.len();
181        if new_len <= old_len {
182            return;
183        }
184
185        // Grow type tags (pad with Empty).
186        let mut tags: Vec<u8> = self.type_tag.values().to_vec();
187        tags.resize(new_len, TypeTag::Empty as u8);
188        self.type_tag = Arc::new(UInt8Array::from(tags));
189
190        // Grow lanes when present; append nulls for new rows.
191        if let Some(a) = &self.numbers {
192            use arrow_array::builder::Float64Builder;
193            let mut b = Float64Builder::with_capacity(new_len);
194            for i in 0..old_len {
195                if a.is_null(i) {
196                    b.append_null();
197                } else {
198                    b.append_value(a.value(i));
199                }
200            }
201            for _ in old_len..new_len {
202                b.append_null();
203            }
204            self.numbers = Some(Arc::new(b.finish()));
205        }
206        if let Some(a) = &self.booleans {
207            use arrow_array::builder::BooleanBuilder;
208            let mut b = BooleanBuilder::with_capacity(new_len);
209            for i in 0..old_len {
210                if a.is_null(i) {
211                    b.append_null();
212                } else {
213                    b.append_value(a.value(i));
214                }
215            }
216            for _ in old_len..new_len {
217                b.append_null();
218            }
219            self.booleans = Some(Arc::new(b.finish()));
220        }
221        if let Some(a) = &self.errors {
222            use arrow_array::builder::UInt8Builder;
223            let mut b = UInt8Builder::with_capacity(new_len);
224            for i in 0..old_len {
225                if a.is_null(i) {
226                    b.append_null();
227                } else {
228                    b.append_value(a.value(i));
229                }
230            }
231            for _ in old_len..new_len {
232                b.append_null();
233            }
234            self.errors = Some(Arc::new(b.finish()));
235        }
236        if let Some(a) = &self.text {
237            use arrow_array::builder::StringBuilder;
238            let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
239            let mut b = StringBuilder::with_capacity(new_len, 0);
240            for i in 0..old_len {
241                if sa.is_null(i) {
242                    b.append_null();
243                } else {
244                    b.append_value(sa.value(i));
245                }
246            }
247            for _ in old_len..new_len {
248                b.append_null();
249            }
250            self.text = Some(Arc::new(b.finish()) as ArrayRef);
251        }
252
253        // Length-dependent caches must be dropped.
254        self.lazy_null_numbers = OnceCell::new();
255        self.lazy_null_booleans = OnceCell::new();
256        self.lazy_null_text = OnceCell::new();
257        self.lazy_null_errors = OnceCell::new();
258        self.lowered_text = OnceCell::new();
259
260        self.meta.len = new_len;
261    }
262}
263
264#[derive(Debug, Clone)]
265pub struct ArrowColumn {
266    pub chunks: Vec<ColumnChunk>,
267    pub sparse_chunks: FxHashMap<usize, ColumnChunk>,
268    pub index: u32,
269}
270
271impl ArrowColumn {
272    #[inline]
273    pub fn chunk(&self, idx: usize) -> Option<&ColumnChunk> {
274        if idx < self.chunks.len() {
275            Some(&self.chunks[idx])
276        } else {
277            self.sparse_chunks.get(&idx)
278        }
279    }
280
281    #[inline]
282    pub fn chunk_mut(&mut self, idx: usize) -> Option<&mut ColumnChunk> {
283        if idx < self.chunks.len() {
284            Some(&mut self.chunks[idx])
285        } else {
286            self.sparse_chunks.get_mut(&idx)
287        }
288    }
289
290    #[inline]
291    pub fn has_sparse_chunks(&self) -> bool {
292        !self.sparse_chunks.is_empty()
293    }
294
295    #[inline]
296    pub fn total_chunk_count(&self) -> usize {
297        self.chunks.len() + self.sparse_chunks.len()
298    }
299}
300
301#[derive(Debug, Clone)]
302pub struct ArrowSheet {
303    pub name: Arc<str>,
304    pub columns: Vec<ArrowColumn>,
305    pub nrows: u32,
306    pub chunk_starts: Vec<usize>,
307    /// Preferred chunk size (rows) for capacity growth operations.
308    ///
309    /// For Arrow-ingested sheets this matches the ingest `chunk_rows`. For sparse/overlay-created
310    /// sheets this defaults to 32k to avoid creating thousands of tiny chunks during growth.
311    pub chunk_rows: usize,
312}
313
314#[derive(Debug, Default, Clone)]
315pub struct SheetStore {
316    pub sheets: Vec<ArrowSheet>,
317}
318
319impl SheetStore {
320    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
321        self.sheets.iter().find(|s| s.name.as_ref() == name)
322    }
323    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
324        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
325    }
326}
327
328/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
329pub struct IngestBuilder {
330    name: Arc<str>,
331    ncols: usize,
332    chunk_rows: usize,
333    date_system: crate::engine::DateSystem,
334
335    // Per-column active builders for current chunk
336    num_builders: Vec<Float64Builder>,
337    bool_builders: Vec<BooleanBuilder>,
338    text_builders: Vec<StringBuilder>,
339    err_builders: Vec<UInt8Builder>,
340    tag_builders: Vec<UInt8Builder>,
341
342    // Per-column per-lane non-null counters for current chunk
343    lane_counts: Vec<LaneCounts>,
344
345    // Accumulated chunks
346    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
347    row_in_chunk: usize,
348    total_rows: u32,
349}
350
351#[derive(Debug, Clone, Copy, Default)]
352struct LaneCounts {
353    n_num: usize,
354    n_bool: usize,
355    n_text: usize,
356    n_err: usize,
357}
358
359impl IngestBuilder {
360    pub fn new(
361        sheet_name: &str,
362        ncols: usize,
363        chunk_rows: usize,
364        date_system: crate::engine::DateSystem,
365    ) -> Self {
366        let mut chunks = Vec::with_capacity(ncols);
367        chunks.resize_with(ncols, Vec::new);
368        Self {
369            name: Arc::from(sheet_name.to_string()),
370            ncols,
371            chunk_rows: chunk_rows.max(1),
372            date_system,
373            num_builders: (0..ncols)
374                .map(|_| Float64Builder::with_capacity(chunk_rows))
375                .collect(),
376            bool_builders: (0..ncols)
377                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
378                .collect(),
379            text_builders: (0..ncols)
380                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
381                .collect(),
382            err_builders: (0..ncols)
383                .map(|_| UInt8Builder::with_capacity(chunk_rows))
384                .collect(),
385            tag_builders: (0..ncols)
386                .map(|_| UInt8Builder::with_capacity(chunk_rows))
387                .collect(),
388            lane_counts: vec![LaneCounts::default(); ncols],
389            chunks,
390            row_in_chunk: 0,
391            total_rows: 0,
392        }
393    }
394
395    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
396    /// Text borrows are copied into the internal StringBuilder.
397    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
398        assert_eq!(row.len(), self.ncols, "row width mismatch");
399        for (c, cell) in row.iter().enumerate() {
400            match cell {
401                CellIngest::Empty => {
402                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
403                    self.num_builders[c].append_null();
404                    self.bool_builders[c].append_null();
405                    self.text_builders[c].append_null();
406                    self.err_builders[c].append_null();
407                }
408                CellIngest::Number(n) => {
409                    self.tag_builders[c].append_value(TypeTag::Number as u8);
410                    self.num_builders[c].append_value(*n);
411                    self.lane_counts[c].n_num += 1;
412                    self.bool_builders[c].append_null();
413                    self.text_builders[c].append_null();
414                    self.err_builders[c].append_null();
415                }
416                CellIngest::Boolean(b) => {
417                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
418                    self.num_builders[c].append_null();
419                    self.bool_builders[c].append_value(*b);
420                    self.lane_counts[c].n_bool += 1;
421                    self.text_builders[c].append_null();
422                    self.err_builders[c].append_null();
423                }
424                CellIngest::Text(s) => {
425                    self.tag_builders[c].append_value(TypeTag::Text as u8);
426                    self.num_builders[c].append_null();
427                    self.bool_builders[c].append_null();
428                    self.text_builders[c].append_value(s);
429                    self.lane_counts[c].n_text += 1;
430                    self.err_builders[c].append_null();
431                }
432                CellIngest::ErrorCode(code) => {
433                    self.tag_builders[c].append_value(TypeTag::Error as u8);
434                    self.num_builders[c].append_null();
435                    self.bool_builders[c].append_null();
436                    self.text_builders[c].append_null();
437                    self.err_builders[c].append_value(*code);
438                    self.lane_counts[c].n_err += 1;
439                }
440                CellIngest::DateSerial(serial) => {
441                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
442                    self.num_builders[c].append_value(*serial);
443                    self.lane_counts[c].n_num += 1;
444                    self.bool_builders[c].append_null();
445                    self.text_builders[c].append_null();
446                    self.err_builders[c].append_null();
447                }
448                CellIngest::Pending => {
449                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
450                    self.num_builders[c].append_null();
451                    self.bool_builders[c].append_null();
452                    self.text_builders[c].append_null();
453                    self.err_builders[c].append_null();
454                }
455            }
456        }
457        self.row_in_chunk += 1;
458        self.total_rows += 1;
459        if self.row_in_chunk >= self.chunk_rows {
460            self.finish_chunk();
461        }
462        Ok(())
463    }
464
465    /// Streaming row append from an iterator of typed cell tokens.
466    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
467    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
468    where
469        I: ExactSizeIterator<Item = CellIngest<'a>>,
470    {
471        assert_eq!(iter.len(), self.ncols, "row width mismatch");
472        for (c, cell) in iter.enumerate() {
473            match cell {
474                CellIngest::Empty => {
475                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
476                    self.num_builders[c].append_null();
477                    self.bool_builders[c].append_null();
478                    self.text_builders[c].append_null();
479                    self.err_builders[c].append_null();
480                }
481                CellIngest::Number(n) => {
482                    self.tag_builders[c].append_value(TypeTag::Number as u8);
483                    self.num_builders[c].append_value(n);
484                    self.lane_counts[c].n_num += 1;
485                    self.bool_builders[c].append_null();
486                    self.text_builders[c].append_null();
487                    self.err_builders[c].append_null();
488                }
489                CellIngest::Boolean(b) => {
490                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
491                    self.num_builders[c].append_null();
492                    self.bool_builders[c].append_value(b);
493                    self.lane_counts[c].n_bool += 1;
494                    self.text_builders[c].append_null();
495                    self.err_builders[c].append_null();
496                }
497                CellIngest::Text(s) => {
498                    self.tag_builders[c].append_value(TypeTag::Text as u8);
499                    self.num_builders[c].append_null();
500                    self.bool_builders[c].append_null();
501                    self.text_builders[c].append_value(s);
502                    self.lane_counts[c].n_text += 1;
503                    self.err_builders[c].append_null();
504                }
505                CellIngest::ErrorCode(code) => {
506                    self.tag_builders[c].append_value(TypeTag::Error as u8);
507                    self.num_builders[c].append_null();
508                    self.bool_builders[c].append_null();
509                    self.text_builders[c].append_null();
510                    self.err_builders[c].append_value(code);
511                    self.lane_counts[c].n_err += 1;
512                }
513                CellIngest::DateSerial(serial) => {
514                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
515                    self.num_builders[c].append_value(serial);
516                    self.lane_counts[c].n_num += 1;
517                    self.bool_builders[c].append_null();
518                    self.text_builders[c].append_null();
519                    self.err_builders[c].append_null();
520                }
521                CellIngest::Pending => {
522                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
523                    self.num_builders[c].append_null();
524                    self.bool_builders[c].append_null();
525                    self.text_builders[c].append_null();
526                    self.err_builders[c].append_null();
527                }
528            }
529        }
530        self.row_in_chunk += 1;
531        self.total_rows += 1;
532        if self.row_in_chunk >= self.chunk_rows {
533            self.finish_chunk();
534        }
535        Ok(())
536    }
537
538    /// Append a single row of values. Length must match `ncols`.
539    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
540        assert_eq!(row.len(), self.ncols, "row width mismatch");
541
542        for (c, v) in row.iter().enumerate() {
543            let tag = TypeTag::from_value(v) as u8;
544            self.tag_builders[c].append_value(tag);
545
546            match v {
547                LiteralValue::Empty => {
548                    self.num_builders[c].append_null();
549                    self.bool_builders[c].append_null();
550                    self.text_builders[c].append_null();
551                    self.err_builders[c].append_null();
552                }
553                LiteralValue::Int(i) => {
554                    self.num_builders[c].append_value(*i as f64);
555                    self.lane_counts[c].n_num += 1;
556                    self.bool_builders[c].append_null();
557                    self.text_builders[c].append_null();
558                    self.err_builders[c].append_null();
559                }
560                LiteralValue::Number(n) => {
561                    self.num_builders[c].append_value(*n);
562                    self.lane_counts[c].n_num += 1;
563                    self.bool_builders[c].append_null();
564                    self.text_builders[c].append_null();
565                    self.err_builders[c].append_null();
566                }
567                LiteralValue::Boolean(b) => {
568                    self.num_builders[c].append_null();
569                    self.bool_builders[c].append_value(*b);
570                    self.lane_counts[c].n_bool += 1;
571                    self.text_builders[c].append_null();
572                    self.err_builders[c].append_null();
573                }
574                LiteralValue::Text(s) => {
575                    self.num_builders[c].append_null();
576                    self.bool_builders[c].append_null();
577                    self.text_builders[c].append_value(s);
578                    self.lane_counts[c].n_text += 1;
579                    self.err_builders[c].append_null();
580                }
581                LiteralValue::Error(e) => {
582                    self.num_builders[c].append_null();
583                    self.bool_builders[c].append_null();
584                    self.text_builders[c].append_null();
585                    self.err_builders[c].append_value(map_error_code(e.kind));
586                    self.lane_counts[c].n_err += 1;
587                }
588                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
589                LiteralValue::Date(d) => {
590                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
591                    let serial =
592                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
593                    self.num_builders[c].append_value(serial);
594                    self.lane_counts[c].n_num += 1;
595                    self.bool_builders[c].append_null();
596                    self.text_builders[c].append_null();
597                    self.err_builders[c].append_null();
598                }
599                LiteralValue::DateTime(dt) => {
600                    let serial =
601                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
602                    self.num_builders[c].append_value(serial);
603                    self.lane_counts[c].n_num += 1;
604                    self.bool_builders[c].append_null();
605                    self.text_builders[c].append_null();
606                    self.err_builders[c].append_null();
607                }
608                LiteralValue::Time(t) => {
609                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
610                    self.num_builders[c].append_value(serial);
611                    self.lane_counts[c].n_num += 1;
612                    self.bool_builders[c].append_null();
613                    self.text_builders[c].append_null();
614                    self.err_builders[c].append_null();
615                }
616                LiteralValue::Duration(dur) => {
617                    let serial = dur.num_seconds() as f64 / 86_400.0;
618                    self.num_builders[c].append_value(serial);
619                    self.lane_counts[c].n_num += 1;
620                    self.bool_builders[c].append_null();
621                    self.text_builders[c].append_null();
622                    self.err_builders[c].append_null();
623                }
624                LiteralValue::Array(_) => {
625                    // Not allowed as a stored scalar; mark as error kind VALUE
626                    self.num_builders[c].append_null();
627                    self.bool_builders[c].append_null();
628                    self.text_builders[c].append_null();
629                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
630                    self.lane_counts[c].n_err += 1;
631                }
632                LiteralValue::Pending => {
633                    // Pending: tag only; all lanes remain null (no error)
634                    self.num_builders[c].append_null();
635                    self.bool_builders[c].append_null();
636                    self.text_builders[c].append_null();
637                    self.err_builders[c].append_null();
638                }
639            }
640        }
641
642        self.row_in_chunk += 1;
643        self.total_rows += 1;
644
645        if self.row_in_chunk >= self.chunk_rows {
646            self.finish_chunk();
647        }
648
649        Ok(())
650    }
651
652    fn finish_chunk(&mut self) {
653        if self.row_in_chunk == 0 {
654            return;
655        }
656        for c in 0..self.ncols {
657            let len = self.row_in_chunk;
658            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
659                None
660            } else {
661                Some(Arc::new(self.num_builders[c].finish()))
662            };
663            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
664                None
665            } else {
666                Some(Arc::new(self.bool_builders[c].finish()))
667            };
668            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
669                None
670            } else {
671                Some(Arc::new(self.text_builders[c].finish()))
672            };
673            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
674                None
675            } else {
676                Some(Arc::new(self.err_builders[c].finish()))
677            };
678            let tags: UInt8Array = self.tag_builders[c].finish();
679
680            let chunk = ColumnChunk {
681                numbers: numbers_arc,
682                booleans: booleans_arc,
683                text: text_ref,
684                errors: errors_arc,
685                type_tag: Arc::new(tags),
686                formula_id: None,
687                meta: ColumnChunkMeta {
688                    len,
689                    non_null_num: self.lane_counts[c].n_num,
690                    non_null_bool: self.lane_counts[c].n_bool,
691                    non_null_text: self.lane_counts[c].n_text,
692                    non_null_err: self.lane_counts[c].n_err,
693                },
694                lazy_null_numbers: OnceCell::new(),
695                lazy_null_booleans: OnceCell::new(),
696                lazy_null_text: OnceCell::new(),
697                lazy_null_errors: OnceCell::new(),
698                lowered_text: OnceCell::new(),
699                overlay: Overlay::new(),
700                computed_overlay: Overlay::new(),
701            };
702            self.chunks[c].push(chunk);
703
704            // re-init builders for next chunk
705            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
706            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
707            self.text_builders[c] =
708                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
709            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
710            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
711            self.lane_counts[c] = LaneCounts::default();
712        }
713        self.row_in_chunk = 0;
714    }
715
716    pub fn finish(mut self) -> ArrowSheet {
717        // flush partial chunk
718        if self.row_in_chunk > 0 {
719            self.finish_chunk();
720        }
721
722        let mut columns = Vec::with_capacity(self.ncols);
723        for (idx, chunks) in self.chunks.into_iter().enumerate() {
724            columns.push(ArrowColumn {
725                chunks,
726                sparse_chunks: FxHashMap::default(),
727                index: idx as u32,
728            });
729        }
730        // Precompute chunk starts from first column and enforce alignment across columns
731        let mut chunk_starts: Vec<usize> = Vec::new();
732        if let Some(col0) = columns.first() {
733            let chunks_len0 = col0.chunks.len();
734            for (ci, col) in columns.iter().enumerate() {
735                if col.chunks.len() != chunks_len0 {
736                    panic!(
737                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
738                        ci,
739                        col.chunks.len(),
740                        chunks_len0
741                    );
742                }
743            }
744            let mut cur = 0usize;
745            for i in 0..chunks_len0 {
746                let len_i = col0.chunks[i].type_tag.len();
747                for (ci, col) in columns.iter().enumerate() {
748                    let got = col.chunks[i].type_tag.len();
749                    if got != len_i {
750                        panic!(
751                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
752                        );
753                    }
754                }
755                chunk_starts.push(cur);
756                cur += len_i;
757            }
758        }
759        ArrowSheet {
760            name: self.name,
761            columns,
762            nrows: self.total_rows,
763            chunk_starts,
764            chunk_rows: self.chunk_rows,
765        }
766    }
767}
768
769pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
770    match kind {
771        ExcelErrorKind::Null => 1,
772        ExcelErrorKind::Ref => 2,
773        ExcelErrorKind::Name => 3,
774        ExcelErrorKind::Value => 4,
775        ExcelErrorKind::Div => 5,
776        ExcelErrorKind::Na => 6,
777        ExcelErrorKind::Num => 7,
778        ExcelErrorKind::Error => 8,
779        ExcelErrorKind::NImpl => 9,
780        ExcelErrorKind::Spill => 10,
781        ExcelErrorKind::Calc => 11,
782        ExcelErrorKind::Circ => 12,
783        ExcelErrorKind::Cancelled => 13,
784    }
785}
786
787pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
788    match code {
789        1 => ExcelErrorKind::Null,
790        2 => ExcelErrorKind::Ref,
791        3 => ExcelErrorKind::Name,
792        4 => ExcelErrorKind::Value,
793        5 => ExcelErrorKind::Div,
794        6 => ExcelErrorKind::Na,
795        7 => ExcelErrorKind::Num,
796        8 => ExcelErrorKind::Error,
797        9 => ExcelErrorKind::NImpl,
798        10 => ExcelErrorKind::Spill,
799        11 => ExcelErrorKind::Calc,
800        12 => ExcelErrorKind::Circ,
801        13 => ExcelErrorKind::Cancelled,
802        _ => ExcelErrorKind::Error,
803    }
804}
805
806// ─────────────────────────── Overlay (Phase C) ────────────────────────────
807
808/// Zero-allocation cell token for ingestion.
809pub enum CellIngest<'a> {
810    Empty,
811    Number(f64),
812    Boolean(bool),
813    Text(&'a str),
814    ErrorCode(u8),
815    DateSerial(f64),
816    Pending,
817}
818
819#[derive(Debug, Clone, PartialEq)]
820pub enum OverlayValue {
821    Empty,
822    Number(f64),
823    /// Date/Time/DateTime stored as an Excel serial in the numeric lane.
824    DateTime(f64),
825    /// Duration stored as an Excel-style day-fraction in the numeric lane.
826    Duration(f64),
827    Boolean(bool),
828    Text(Arc<str>),
829    Error(u8),
830    Pending,
831}
832
833impl OverlayValue {
834    #[inline]
835    pub(crate) fn estimated_payload_bytes(&self) -> usize {
836        match self {
837            OverlayValue::Empty | OverlayValue::Pending => 0,
838            OverlayValue::Number(_) | OverlayValue::DateTime(_) | OverlayValue::Duration(_) => {
839                core::mem::size_of::<f64>()
840            }
841            OverlayValue::Boolean(_) => core::mem::size_of::<bool>(),
842            OverlayValue::Error(_) => core::mem::size_of::<u8>(),
843            // Deterministic estimate: count string bytes only.
844            OverlayValue::Text(s) => s.len(),
845        }
846    }
847
848    #[inline]
849    pub(crate) fn type_tag(&self) -> TypeTag {
850        match self {
851            OverlayValue::Empty => TypeTag::Empty,
852            OverlayValue::Number(_) => TypeTag::Number,
853            OverlayValue::DateTime(_) => TypeTag::DateTime,
854            OverlayValue::Duration(_) => TypeTag::Duration,
855            OverlayValue::Boolean(_) => TypeTag::Boolean,
856            OverlayValue::Text(_) => TypeTag::Text,
857            OverlayValue::Error(_) => TypeTag::Error,
858            OverlayValue::Pending => TypeTag::Pending,
859        }
860    }
861
862    #[inline]
863    pub(crate) fn numeric_lane_value(&self) -> Option<f64> {
864        match self {
865            OverlayValue::Number(n) | OverlayValue::DateTime(n) | OverlayValue::Duration(n) => {
866                Some(*n)
867            }
868            _ => None,
869        }
870    }
871
872    #[inline]
873    pub(crate) fn boolean_lane_value(&self) -> Option<bool> {
874        match self {
875            OverlayValue::Boolean(b) => Some(*b),
876            _ => None,
877        }
878    }
879
880    #[inline]
881    pub(crate) fn text_lane_value(&self) -> Option<&str> {
882        match self {
883            OverlayValue::Text(s) => Some(s.as_ref()),
884            _ => None,
885        }
886    }
887
888    #[inline]
889    pub(crate) fn error_lane_value(&self) -> Option<u8> {
890        match self {
891            OverlayValue::Error(code) => Some(*code),
892            _ => None,
893        }
894    }
895
896    pub(crate) fn lowered_text_value(&self) -> Option<String> {
897        match self {
898            OverlayValue::Text(s) => Some(s.to_lowercase()),
899            OverlayValue::Number(n) | OverlayValue::DateTime(n) | OverlayValue::Duration(n) => {
900                Some(n.to_string())
901            }
902            OverlayValue::Boolean(b) => Some(if *b { "true" } else { "false" }.to_string()),
903            OverlayValue::Empty | OverlayValue::Error(_) | OverlayValue::Pending => None,
904        }
905    }
906
907    pub(crate) fn to_literal(&self) -> LiteralValue {
908        match self {
909            OverlayValue::Empty => LiteralValue::Empty,
910            OverlayValue::Number(n) => LiteralValue::Number(*n),
911            OverlayValue::DateTime(serial) => LiteralValue::from_serial_number(*serial),
912            OverlayValue::Duration(serial) => {
913                let nanos_f = *serial * 86_400.0 * 1_000_000_000.0;
914                let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
915                LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
916            }
917            OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
918            OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
919            OverlayValue::Error(code) => {
920                LiteralValue::Error(ExcelError::new(unmap_error_code(*code)))
921            }
922            OverlayValue::Pending => LiteralValue::Pending,
923        }
924    }
925}
926
927#[derive(Debug, Clone)]
928pub(crate) enum OverlayScalar<'a> {
929    Borrowed(&'a OverlayValue),
930    Owned(OverlayValue),
931}
932
933impl<'a> OverlayScalar<'a> {
934    #[inline]
935    fn as_value(&self) -> &OverlayValue {
936        match self {
937            OverlayScalar::Borrowed(value) => value,
938            OverlayScalar::Owned(value) => value,
939        }
940    }
941
942    #[inline]
943    pub(crate) fn to_overlay_value(&self) -> OverlayValue {
944        self.as_value().clone()
945    }
946
947    #[inline]
948    pub(crate) fn type_tag(&self) -> TypeTag {
949        self.as_value().type_tag()
950    }
951
952    #[inline]
953    pub(crate) fn numeric_lane_value(&self) -> Option<f64> {
954        self.as_value().numeric_lane_value()
955    }
956
957    #[inline]
958    pub(crate) fn boolean_lane_value(&self) -> Option<bool> {
959        self.as_value().boolean_lane_value()
960    }
961
962    #[inline]
963    pub(crate) fn text_lane_value(&self) -> Option<&str> {
964        self.as_value().text_lane_value()
965    }
966
967    #[inline]
968    pub(crate) fn error_lane_value(&self) -> Option<u8> {
969        self.as_value().error_lane_value()
970    }
971
972    pub(crate) fn lowered_text_value(&self) -> Option<String> {
973        self.as_value().lowered_text_value()
974    }
975
976    pub(crate) fn to_literal(&self) -> LiteralValue {
977        self.as_value().to_literal()
978    }
979}
980
981const OVERLAY_ENTRY_BASE_BYTES: usize = 32;
982const OVERLAY_FRAGMENT_BASE_BYTES: usize = 48;
983
984#[allow(dead_code)]
985#[derive(Debug, Clone)]
986pub(crate) struct OverlayFragmentPayload {
987    type_tags: Arc<UInt8Array>,
988    numbers: Option<Arc<Float64Array>>,
989    booleans: Option<Arc<BooleanArray>>,
990    text: Option<ArrayRef>,
991    errors: Option<Arc<UInt8Array>>,
992    estimated_bytes: usize,
993}
994
995impl OverlayFragmentPayload {
996    fn from_values(values: Vec<OverlayValue>) -> Self {
997        let len = values.len();
998        let mut tag_b = UInt8Builder::with_capacity(len);
999        let mut nb = Float64Builder::with_capacity(len);
1000        let mut bb = BooleanBuilder::with_capacity(len);
1001        let mut sb = StringBuilder::with_capacity(len, len.saturating_mul(8));
1002        let mut eb = UInt8Builder::with_capacity(len);
1003        let mut non_num = 0usize;
1004        let mut non_bool = 0usize;
1005        let mut non_text = 0usize;
1006        let mut non_err = 0usize;
1007
1008        for value in &values {
1009            append_overlay_value_to_lane_builders(
1010                value,
1011                &mut tag_b,
1012                &mut nb,
1013                &mut bb,
1014                &mut sb,
1015                &mut eb,
1016                &mut non_num,
1017                &mut non_bool,
1018                &mut non_text,
1019                &mut non_err,
1020            );
1021        }
1022
1023        let type_tags = Arc::new(tag_b.finish());
1024        let numbers = {
1025            let a = nb.finish();
1026            (non_num > 0).then(|| Arc::new(a))
1027        };
1028        let booleans = {
1029            let a = bb.finish();
1030            (non_bool > 0).then(|| Arc::new(a))
1031        };
1032        let text = {
1033            let a = sb.finish();
1034            (non_text > 0).then(|| Arc::new(a) as ArrayRef)
1035        };
1036        let errors = {
1037            let a = eb.finish();
1038            (non_err > 0).then(|| Arc::new(a))
1039        };
1040
1041        let estimated_bytes = type_tags
1042            .get_array_memory_size()
1043            .saturating_add(
1044                numbers
1045                    .as_ref()
1046                    .map(|a| a.get_array_memory_size())
1047                    .unwrap_or(0),
1048            )
1049            .saturating_add(
1050                booleans
1051                    .as_ref()
1052                    .map(|a| a.get_array_memory_size())
1053                    .unwrap_or(0),
1054            )
1055            .saturating_add(
1056                text.as_ref()
1057                    .map(|a| a.get_array_memory_size())
1058                    .unwrap_or(0),
1059            )
1060            .saturating_add(
1061                errors
1062                    .as_ref()
1063                    .map(|a| a.get_array_memory_size())
1064                    .unwrap_or(0),
1065            );
1066
1067        Self {
1068            type_tags,
1069            numbers,
1070            booleans,
1071            text,
1072            errors,
1073            estimated_bytes,
1074        }
1075    }
1076
1077    fn overlay_value(&self, idx: usize) -> Option<OverlayValue> {
1078        if idx >= self.type_tags.len() || self.type_tags.is_null(idx) {
1079            return None;
1080        }
1081        match TypeTag::from_u8(self.type_tags.value(idx)) {
1082            TypeTag::Empty => Some(OverlayValue::Empty),
1083            TypeTag::Number => Some(OverlayValue::Number(self.number_at(idx)?)),
1084            TypeTag::DateTime => Some(OverlayValue::DateTime(self.number_at(idx)?)),
1085            TypeTag::Duration => Some(OverlayValue::Duration(self.number_at(idx)?)),
1086            TypeTag::Boolean => Some(OverlayValue::Boolean(self.boolean_at(idx)?)),
1087            TypeTag::Text => Some(OverlayValue::Text(Arc::from(self.text_at(idx)?))),
1088            TypeTag::Error => Some(OverlayValue::Error(self.error_at(idx)?)),
1089            TypeTag::Pending => Some(OverlayValue::Pending),
1090        }
1091    }
1092
1093    #[inline]
1094    fn get_scalar(&self, idx: usize) -> Option<OverlayScalar<'_>> {
1095        self.overlay_value(idx).map(OverlayScalar::Owned)
1096    }
1097
1098    #[inline]
1099    fn number_at(&self, idx: usize) -> Option<f64> {
1100        let arr = self.numbers.as_ref()?;
1101        (!arr.is_null(idx)).then(|| arr.value(idx))
1102    }
1103
1104    #[inline]
1105    fn boolean_at(&self, idx: usize) -> Option<bool> {
1106        let arr = self.booleans.as_ref()?;
1107        (!arr.is_null(idx)).then(|| arr.value(idx))
1108    }
1109
1110    #[inline]
1111    fn text_at(&self, idx: usize) -> Option<&str> {
1112        let arr = self.text.as_ref()?;
1113        let arr = arr.as_any().downcast_ref::<StringArray>()?;
1114        (!arr.is_null(idx)).then(|| arr.value(idx))
1115    }
1116
1117    #[inline]
1118    fn error_at(&self, idx: usize) -> Option<u8> {
1119        let arr = self.errors.as_ref()?;
1120        (!arr.is_null(idx)).then(|| arr.value(idx))
1121    }
1122
1123    #[inline]
1124    fn values_slice(&self, start: usize, len: usize) -> Vec<OverlayValue> {
1125        (start..start.saturating_add(len))
1126            .filter_map(|idx| self.overlay_value(idx))
1127            .collect()
1128    }
1129
1130    #[inline]
1131    fn estimated_bytes(&self) -> usize {
1132        self.estimated_bytes
1133    }
1134}
1135#[derive(Debug, Clone)]
1136pub(crate) enum OverlayFragment {
1137    SparseOffsets {
1138        offsets: Vec<u32>,
1139        payload: OverlayFragmentPayload,
1140    },
1141    DenseRange {
1142        start: u32,
1143        len: u32,
1144        payload: OverlayFragmentPayload,
1145    },
1146    RunRange {
1147        start: u32,
1148        len: u32,
1149        run_ends: Vec<u32>,
1150        payload: OverlayFragmentPayload,
1151    },
1152}
1153
1154impl OverlayFragment {
1155    const MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK: usize = 128;
1156
1157    pub(crate) fn sparse_offsets(items: Vec<(usize, OverlayValue)>) -> Option<Self> {
1158        let mut by_offset: BTreeMap<usize, OverlayValue> = BTreeMap::new();
1159        for (offset, value) in items {
1160            by_offset.insert(offset, value);
1161        }
1162        if by_offset.is_empty() {
1163            return None;
1164        }
1165
1166        let mut offsets = Vec::with_capacity(by_offset.len());
1167        let mut values = Vec::with_capacity(by_offset.len());
1168        for (offset, value) in by_offset {
1169            offsets.push(u32::try_from(offset).expect("overlay offset fits in u32"));
1170            values.push(value);
1171        }
1172
1173        Some(Self::SparseOffsets {
1174            offsets,
1175            payload: OverlayFragmentPayload::from_values(values),
1176        })
1177    }
1178
1179    pub(crate) fn sparse_offsets_if_estimated_smaller_than_points(
1180        items: Vec<(usize, OverlayValue)>,
1181        point_estimate: usize,
1182    ) -> Option<Result<Self, Vec<(usize, OverlayValue)>>> {
1183        let fragment = Self::sparse_offsets(items)?;
1184        if fragment.estimated_bytes() < point_estimate {
1185            Some(Ok(fragment))
1186        } else {
1187            Some(Err(fragment.cells()))
1188        }
1189    }
1190
1191    pub(crate) fn dense_range(start: usize, values: Vec<OverlayValue>) -> Option<Self> {
1192        let len = values.len();
1193        if len == 0 {
1194            return None;
1195        }
1196        Some(Self::DenseRange {
1197            start: u32::try_from(start).expect("overlay start fits in u32"),
1198            len: u32::try_from(len).expect("overlay length fits in u32"),
1199            payload: OverlayFragmentPayload::from_values(values),
1200        })
1201    }
1202
1203    pub(crate) fn run_range(start: usize, values: Vec<OverlayValue>) -> Option<Self> {
1204        if values.is_empty() {
1205            return None;
1206        }
1207
1208        let mut run_ends = Vec::new();
1209        let mut run_values = Vec::new();
1210        let mut current = values[0].clone();
1211        for (idx, value) in values.iter().enumerate().skip(1) {
1212            if *value != current {
1213                run_ends.push(idx);
1214                run_values.push(current);
1215                current = value.clone();
1216            }
1217        }
1218        run_ends.push(values.len());
1219        run_values.push(current);
1220
1221        Self::run_range_from_parts(start, values.len(), run_ends, run_values)
1222    }
1223
1224    fn run_range_from_parts(
1225        start: usize,
1226        len: usize,
1227        run_ends: Vec<usize>,
1228        values: Vec<OverlayValue>,
1229    ) -> Option<Self> {
1230        if len == 0 || run_ends.is_empty() || run_ends.len() != values.len() {
1231            return None;
1232        }
1233
1234        let mut merged_ends: Vec<u32> = Vec::with_capacity(run_ends.len());
1235        let mut merged_values: Vec<OverlayValue> = Vec::with_capacity(values.len());
1236        let mut prev_end = 0usize;
1237        for (end, value) in run_ends.into_iter().zip(values.into_iter()) {
1238            if end <= prev_end || end > len {
1239                return None;
1240            }
1241            if merged_values.last().is_some_and(|last| *last == value) {
1242                if let Some(last_end) = merged_ends.last_mut() {
1243                    *last_end = u32::try_from(end).expect("run end fits in u32");
1244                }
1245            } else {
1246                merged_ends.push(u32::try_from(end).expect("run end fits in u32"));
1247                merged_values.push(value);
1248            }
1249            prev_end = end;
1250        }
1251
1252        if prev_end != len || merged_ends.last().copied() != Some(len as u32) {
1253            return None;
1254        }
1255
1256        Some(Self::RunRange {
1257            start: u32::try_from(start).expect("overlay start fits in u32"),
1258            len: u32::try_from(len).expect("overlay length fits in u32"),
1259            run_ends: merged_ends,
1260            payload: OverlayFragmentPayload::from_values(merged_values),
1261        })
1262    }
1263
1264    #[inline]
1265    fn estimated_bytes(&self) -> usize {
1266        match self {
1267            OverlayFragment::SparseOffsets { offsets, payload } => OVERLAY_FRAGMENT_BASE_BYTES
1268                .saturating_add(offsets.len().saturating_mul(core::mem::size_of::<u32>()))
1269                .saturating_add(payload.estimated_bytes()),
1270            OverlayFragment::DenseRange { payload, .. } => {
1271                OVERLAY_FRAGMENT_BASE_BYTES.saturating_add(payload.estimated_bytes())
1272            }
1273            OverlayFragment::RunRange {
1274                run_ends, payload, ..
1275            } => OVERLAY_FRAGMENT_BASE_BYTES
1276                .saturating_add(run_ends.len().saturating_mul(core::mem::size_of::<u32>()))
1277                .saturating_add(payload.estimated_bytes()),
1278        }
1279    }
1280
1281    #[inline]
1282    fn coverage_len(&self) -> usize {
1283        match self {
1284            OverlayFragment::SparseOffsets { offsets, .. } => offsets.len(),
1285            OverlayFragment::DenseRange { len, .. } | OverlayFragment::RunRange { len, .. } => {
1286                *len as usize
1287            }
1288        }
1289    }
1290
1291    pub(crate) fn max_covered_offset(&self) -> usize {
1292        match self {
1293            OverlayFragment::SparseOffsets { offsets, .. } => {
1294                offsets.iter().copied().max().unwrap_or(0) as usize
1295            }
1296            OverlayFragment::DenseRange { start, len, .. }
1297            | OverlayFragment::RunRange { start, len, .. } => (*start as usize)
1298                .saturating_add(*len as usize)
1299                .saturating_sub(1),
1300        }
1301    }
1302
1303    fn interval_coverage(&self) -> Option<core::ops::Range<usize>> {
1304        match self {
1305            OverlayFragment::DenseRange { start, len, .. }
1306            | OverlayFragment::RunRange { start, len, .. } => {
1307                let start = *start as usize;
1308                Some(start..start.saturating_add(*len as usize))
1309            }
1310            OverlayFragment::SparseOffsets { .. } => None,
1311        }
1312    }
1313
1314    fn sparse_offsets_slice(&self) -> Option<&[u32]> {
1315        match self {
1316            OverlayFragment::SparseOffsets { offsets, .. } => Some(offsets.as_slice()),
1317            _ => None,
1318        }
1319    }
1320
1321    fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
1322        if range.is_empty() {
1323            return false;
1324        }
1325        match self {
1326            OverlayFragment::SparseOffsets { offsets, .. } => {
1327                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1328                let idx = offsets.partition_point(|off| *off < start);
1329                offsets
1330                    .get(idx)
1331                    .is_some_and(|off| (*off as usize) < range.end)
1332            }
1333            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => self
1334                .interval_coverage()
1335                .is_some_and(|r| r.start < range.end && range.start < r.end),
1336        }
1337    }
1338
1339    fn intersects_fragment_exact(&self, replacement: &OverlayFragment) -> bool {
1340        if let Some(offsets) = replacement.sparse_offsets_slice() {
1341            self.intersects_sparse_offsets(offsets)
1342        } else if let Some(range) = replacement.interval_coverage() {
1343            self.intersects_interval(range)
1344        } else {
1345            false
1346        }
1347    }
1348
1349    fn intersects_interval(&self, range: core::ops::Range<usize>) -> bool {
1350        if range.is_empty() {
1351            return false;
1352        }
1353        match self {
1354            OverlayFragment::SparseOffsets { offsets, .. } => {
1355                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1356                let idx = offsets.partition_point(|off| *off < start);
1357                offsets
1358                    .get(idx)
1359                    .is_some_and(|off| (*off as usize) < range.end)
1360            }
1361            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => self
1362                .interval_coverage()
1363                .is_some_and(|own| own.start < range.end && range.start < own.end),
1364        }
1365    }
1366
1367    fn intersects_sparse_offsets(&self, replacement_offsets: &[u32]) -> bool {
1368        if replacement_offsets.is_empty() {
1369            return false;
1370        }
1371        match self {
1372            OverlayFragment::SparseOffsets { offsets, .. } => {
1373                Self::sorted_offsets_intersect(offsets, replacement_offsets)
1374            }
1375            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1376                self.interval_coverage().is_some_and(|range| {
1377                    let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1378                    let idx = replacement_offsets.partition_point(|off| *off < start);
1379                    replacement_offsets
1380                        .get(idx)
1381                        .is_some_and(|off| (*off as usize) < range.end)
1382                })
1383            }
1384        }
1385    }
1386
1387    fn sorted_offsets_intersect(a: &[u32], b: &[u32]) -> bool {
1388        let mut ai = 0usize;
1389        let mut bi = 0usize;
1390        while ai < a.len() && bi < b.len() {
1391            match a[ai].cmp(&b[bi]) {
1392                core::cmp::Ordering::Equal => return true,
1393                core::cmp::Ordering::Less => ai += 1,
1394                core::cmp::Ordering::Greater => bi += 1,
1395            }
1396        }
1397        false
1398    }
1399
1400    fn covers_offset(&self, off: usize) -> bool {
1401        self.get_scalar(off).is_some()
1402    }
1403
1404    fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'_>> {
1405        match self {
1406            OverlayFragment::SparseOffsets { offsets, payload } => {
1407                let off = u32::try_from(off).ok()?;
1408                let idx = offsets.binary_search(&off).ok()?;
1409                payload.get_scalar(idx)
1410            }
1411            OverlayFragment::DenseRange {
1412                start,
1413                len,
1414                payload,
1415            } => {
1416                let start = *start as usize;
1417                let rel = off.checked_sub(start)?;
1418                if rel >= *len as usize {
1419                    return None;
1420                }
1421                payload.get_scalar(rel)
1422            }
1423            OverlayFragment::RunRange {
1424                start,
1425                len,
1426                run_ends,
1427                payload,
1428            } => {
1429                let start = *start as usize;
1430                let rel = off.checked_sub(start)?;
1431                if rel >= *len as usize {
1432                    return None;
1433                }
1434                let rel_u32 = u32::try_from(rel).ok()?;
1435                let run_idx = run_ends.partition_point(|end| *end <= rel_u32);
1436                payload.get_scalar(run_idx)
1437            }
1438        }
1439    }
1440
1441    fn subtract_fragment(&self, replacement: &OverlayFragment) -> Vec<OverlayFragment> {
1442        if let Some(offsets) = replacement.sparse_offsets_slice() {
1443            self.subtract_sparse_offsets(offsets)
1444        } else if let Some(range) = replacement.interval_coverage() {
1445            self.subtract_interval(range)
1446        } else {
1447            vec![self.clone()]
1448        }
1449    }
1450
1451    fn subtract_offset(&self, off: usize) -> Vec<OverlayFragment> {
1452        match self {
1453            OverlayFragment::SparseOffsets { .. } => {
1454                let Ok(off) = u32::try_from(off) else {
1455                    return vec![self.clone()];
1456                };
1457                self.subtract_sparse_offsets(core::slice::from_ref(&off))
1458            }
1459            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1460                self.subtract_interval(off..off.saturating_add(1))
1461            }
1462        }
1463    }
1464
1465    fn subtract_interval(&self, replacement: core::ops::Range<usize>) -> Vec<OverlayFragment> {
1466        if replacement.is_empty() {
1467            return vec![self.clone()];
1468        }
1469
1470        match self {
1471            OverlayFragment::SparseOffsets { offsets, payload } => {
1472                let cells: Vec<_> = offsets
1473                    .iter()
1474                    .enumerate()
1475                    .filter_map(|(idx, off)| {
1476                        let off_usize = *off as usize;
1477                        (!replacement.contains(&off_usize))
1478                            .then(|| payload.overlay_value(idx).map(|value| (off_usize, value)))?
1479                    })
1480                    .collect();
1481                OverlayFragment::sparse_offsets(cells).into_iter().collect()
1482            }
1483            OverlayFragment::DenseRange { .. } => {
1484                let Some(own) = self.interval_coverage() else {
1485                    return vec![self.clone()];
1486                };
1487                if own.end <= replacement.start || replacement.end <= own.start {
1488                    return vec![self.clone()];
1489                }
1490                let cut_start = replacement.start.max(own.start);
1491                let cut_end = replacement.end.min(own.end);
1492                let mut out = Vec::with_capacity(2);
1493                if own.start < cut_start
1494                    && let Some(left) =
1495                        self.dense_segment_with_start(own.start, own.start, cut_start)
1496                {
1497                    out.push(left);
1498                }
1499                if cut_end < own.end
1500                    && let Some(right) = self.dense_segment_with_start(cut_end, cut_end, own.end)
1501                {
1502                    out.push(right);
1503                }
1504                out
1505            }
1506            OverlayFragment::RunRange { .. } => {
1507                let Some(own) = self.interval_coverage() else {
1508                    return vec![self.clone()];
1509                };
1510                if own.end <= replacement.start || replacement.end <= own.start {
1511                    return vec![self.clone()];
1512                }
1513                let cut_start = replacement.start.max(own.start);
1514                let cut_end = replacement.end.min(own.end);
1515                let mut out = Vec::with_capacity(2);
1516                if own.start < cut_start
1517                    && let Some(left) = self.run_segment_with_start(own.start, own.start, cut_start)
1518                {
1519                    out.push(left);
1520                }
1521                if cut_end < own.end
1522                    && let Some(right) = self.run_segment_with_start(cut_end, cut_end, own.end)
1523                {
1524                    out.push(right);
1525                }
1526                out
1527            }
1528        }
1529    }
1530
1531    fn subtract_sparse_offsets(&self, replacement_offsets: &[u32]) -> Vec<OverlayFragment> {
1532        if replacement_offsets.is_empty() {
1533            return vec![self.clone()];
1534        }
1535
1536        match self {
1537            OverlayFragment::SparseOffsets { offsets, payload } => {
1538                let cells: Vec<_> = offsets
1539                    .iter()
1540                    .enumerate()
1541                    .filter_map(|(idx, off)| {
1542                        replacement_offsets.binary_search(off).is_err().then(|| {
1543                            payload
1544                                .overlay_value(idx)
1545                                .map(|value| (*off as usize, value))
1546                        })?
1547                    })
1548                    .collect();
1549                OverlayFragment::sparse_offsets(cells).into_iter().collect()
1550            }
1551            OverlayFragment::DenseRange { .. } => {
1552                self.subtract_sparse_offsets_from_dense(replacement_offsets)
1553            }
1554            OverlayFragment::RunRange { .. } => {
1555                self.subtract_sparse_offsets_from_run(replacement_offsets)
1556            }
1557        }
1558    }
1559
1560    fn sparse_holes_in_interval(offsets: &[u32], range: core::ops::Range<usize>) -> Vec<usize> {
1561        if range.is_empty() {
1562            return Vec::new();
1563        }
1564        let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1565        let mut idx = offsets.partition_point(|off| *off < start);
1566        let mut holes = Vec::new();
1567        let mut last = None;
1568        while let Some(off) = offsets.get(idx).copied() {
1569            let off_usize = off as usize;
1570            if off_usize >= range.end {
1571                break;
1572            }
1573            if last != Some(off_usize) {
1574                holes.push(off_usize);
1575                last = Some(off_usize);
1576            }
1577            idx += 1;
1578        }
1579        holes
1580    }
1581
1582    fn subtract_sparse_offsets_from_dense(
1583        &self,
1584        replacement_offsets: &[u32],
1585    ) -> Vec<OverlayFragment> {
1586        let Some(own) = self.interval_coverage() else {
1587            return vec![self.clone()];
1588        };
1589        let holes = Self::sparse_holes_in_interval(replacement_offsets, own.clone());
1590        if holes.is_empty() {
1591            return vec![self.clone()];
1592        }
1593        if holes.len().saturating_add(1) > Self::MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK {
1594            return self.sparse_remainder_excluding_offsets(&holes);
1595        }
1596
1597        let mut out = Vec::with_capacity(holes.len().saturating_add(1));
1598        let mut seg_start = own.start;
1599        for hole in holes {
1600            if seg_start < hole
1601                && let Some(segment) = self.dense_segment_with_start(seg_start, seg_start, hole)
1602            {
1603                out.push(segment);
1604            }
1605            seg_start = hole.saturating_add(1);
1606        }
1607        if seg_start < own.end
1608            && let Some(segment) = self.dense_segment_with_start(seg_start, seg_start, own.end)
1609        {
1610            out.push(segment);
1611        }
1612        out
1613    }
1614
1615    fn subtract_sparse_offsets_from_run(
1616        &self,
1617        replacement_offsets: &[u32],
1618    ) -> Vec<OverlayFragment> {
1619        let Some(own) = self.interval_coverage() else {
1620            return vec![self.clone()];
1621        };
1622        let holes = Self::sparse_holes_in_interval(replacement_offsets, own.clone());
1623        if holes.is_empty() {
1624            return vec![self.clone()];
1625        }
1626        if holes.len().saturating_add(1) > Self::MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK {
1627            return self.sparse_remainder_excluding_offsets(&holes);
1628        }
1629
1630        let mut out = Vec::with_capacity(holes.len().saturating_add(1));
1631        let mut seg_start = own.start;
1632        for hole in holes {
1633            if seg_start < hole
1634                && let Some(segment) = self.run_segment_with_start(seg_start, seg_start, hole)
1635            {
1636                out.push(segment);
1637            }
1638            seg_start = hole.saturating_add(1);
1639        }
1640        if seg_start < own.end
1641            && let Some(segment) = self.run_segment_with_start(seg_start, seg_start, own.end)
1642        {
1643            out.push(segment);
1644        }
1645        out
1646    }
1647
1648    fn sparse_remainder_excluding_offsets(&self, sorted_holes: &[usize]) -> Vec<OverlayFragment> {
1649        let cells: Vec<_> = self
1650            .cells()
1651            .into_iter()
1652            .filter(|(off, _)| sorted_holes.binary_search(off).is_err())
1653            .collect();
1654        OverlayFragment::sparse_offsets(cells).into_iter().collect()
1655    }
1656
1657    fn dense_segment_with_start(
1658        &self,
1659        new_start: usize,
1660        abs_start: usize,
1661        abs_end: usize,
1662    ) -> Option<OverlayFragment> {
1663        match self {
1664            OverlayFragment::DenseRange { start, payload, .. } => {
1665                if abs_start >= abs_end {
1666                    return None;
1667                }
1668                let base = *start as usize;
1669                let rel_start = abs_start.checked_sub(base)?;
1670                let len = abs_end.saturating_sub(abs_start);
1671                OverlayFragment::dense_range(new_start, payload.values_slice(rel_start, len))
1672            }
1673            _ => None,
1674        }
1675    }
1676
1677    fn run_segment_with_start(
1678        &self,
1679        new_start: usize,
1680        abs_start: usize,
1681        abs_end: usize,
1682    ) -> Option<OverlayFragment> {
1683        let OverlayFragment::RunRange {
1684            start,
1685            len,
1686            run_ends,
1687            payload,
1688        } = self
1689        else {
1690            return None;
1691        };
1692        if abs_start >= abs_end {
1693            return None;
1694        }
1695        let base = *start as usize;
1696        let frag_end = base.saturating_add(*len as usize);
1697        if abs_start < base || abs_end > frag_end {
1698            return None;
1699        }
1700
1701        let rel_start = abs_start - base;
1702        let rel_end = abs_end - base;
1703        let mut new_run_ends = Vec::new();
1704        let mut new_values = Vec::new();
1705        let mut prev_end = 0usize;
1706
1707        for (run_idx, end) in run_ends.iter().enumerate() {
1708            let run_start = prev_end;
1709            let run_end = *end as usize;
1710            let inter_start = run_start.max(rel_start);
1711            let inter_end = run_end.min(rel_end);
1712            if inter_start < inter_end {
1713                new_run_ends.push(inter_end - rel_start);
1714                if let Some(value) = payload.overlay_value(run_idx) {
1715                    new_values.push(value);
1716                }
1717            }
1718            prev_end = run_end;
1719            if prev_end >= rel_end {
1720                break;
1721            }
1722        }
1723
1724        OverlayFragment::run_range_from_parts(
1725            new_start,
1726            abs_end.saturating_sub(abs_start),
1727            new_run_ends,
1728            new_values,
1729        )
1730    }
1731
1732    fn cells(&self) -> Vec<(usize, OverlayValue)> {
1733        match self {
1734            OverlayFragment::SparseOffsets { offsets, payload } => offsets
1735                .iter()
1736                .enumerate()
1737                .filter_map(|(idx, off)| {
1738                    payload
1739                        .overlay_value(idx)
1740                        .map(|value| (*off as usize, value))
1741                })
1742                .collect(),
1743            OverlayFragment::DenseRange {
1744                start,
1745                len,
1746                payload,
1747            } => {
1748                let start = *start as usize;
1749                (0..*len as usize)
1750                    .filter_map(|idx| {
1751                        payload
1752                            .overlay_value(idx)
1753                            .map(|value| (start.saturating_add(idx), value))
1754                    })
1755                    .collect()
1756            }
1757            OverlayFragment::RunRange { start, len, .. } => {
1758                let start = *start as usize;
1759                (0..*len as usize)
1760                    .filter_map(|idx| {
1761                        self.get_scalar(start.saturating_add(idx))
1762                            .map(|value| (start.saturating_add(idx), value.to_overlay_value()))
1763                    })
1764                    .collect()
1765            }
1766        }
1767    }
1768
1769    fn slice(&self, off: usize, len: usize) -> Option<OverlayFragment> {
1770        let end = off.saturating_add(len);
1771        if len == 0 {
1772            return None;
1773        }
1774
1775        match self {
1776            OverlayFragment::SparseOffsets { offsets, payload } => {
1777                let start = u32::try_from(off).unwrap_or(u32::MAX);
1778                let lo = offsets.partition_point(|candidate| *candidate < start);
1779                let hi = offsets.partition_point(|candidate| (*candidate as usize) < end);
1780                let cells: Vec<_> = (lo..hi)
1781                    .filter_map(|idx| {
1782                        let rebased = (offsets[idx] as usize).saturating_sub(off);
1783                        payload.overlay_value(idx).map(|value| (rebased, value))
1784                    })
1785                    .collect();
1786                OverlayFragment::sparse_offsets(cells)
1787            }
1788            OverlayFragment::DenseRange { .. } => {
1789                let own = self.interval_coverage()?;
1790                let seg_start = own.start.max(off);
1791                let seg_end = own.end.min(end);
1792                if seg_start >= seg_end {
1793                    return None;
1794                }
1795                self.dense_segment_with_start(seg_start - off, seg_start, seg_end)
1796            }
1797            OverlayFragment::RunRange { .. } => {
1798                let own = self.interval_coverage()?;
1799                let seg_start = own.start.max(off);
1800                let seg_end = own.end.min(end);
1801                if seg_start >= seg_end {
1802                    return None;
1803                }
1804                self.run_segment_with_start(seg_start - off, seg_start, seg_end)
1805            }
1806        }
1807    }
1808}
1809#[derive(Debug, Default, Clone)]
1810pub struct Overlay {
1811    points: HashMap<usize, OverlayValue>,
1812    fragments: Vec<OverlayFragment>,
1813    // Deterministic (and intentionally approximate) accounting of overlay memory.
1814    // This is used for budget enforcement/observability; it does not attempt to reflect
1815    // the allocator's exact overhead.
1816    estimated_bytes: usize,
1817}
1818
1819impl Overlay {
1820    // Deterministic estimate per entry to keep budget enforcement stable across platforms.
1821    // Includes key + map/node overhead (approx) and value payload bytes.
1822    const ENTRY_BASE_BYTES: usize = OVERLAY_ENTRY_BASE_BYTES;
1823
1824    pub fn new() -> Self {
1825        Self {
1826            points: HashMap::new(),
1827            fragments: Vec::new(),
1828            estimated_bytes: 0,
1829        }
1830    }
1831
1832    #[inline]
1833    fn point_estimate(v: &OverlayValue) -> usize {
1834        Self::ENTRY_BASE_BYTES + v.estimated_payload_bytes()
1835    }
1836
1837    #[inline]
1838    fn adjust_estimated_bytes(&mut self, delta: isize) {
1839        if delta >= 0 {
1840            self.estimated_bytes = self.estimated_bytes.saturating_add(delta as usize);
1841        } else {
1842            self.estimated_bytes = self.estimated_bytes.saturating_sub((-delta) as usize);
1843        }
1844    }
1845
1846    #[inline]
1847    pub(crate) fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'_>> {
1848        self.points
1849            .get(&off)
1850            .map(OverlayScalar::Borrowed)
1851            .or_else(|| self.fragments.iter().rev().find_map(|f| f.get_scalar(off)))
1852    }
1853
1854    #[inline]
1855    pub fn get(&self, off: usize) -> Option<OverlayValue> {
1856        self.get_scalar(off).map(|value| value.to_overlay_value())
1857    }
1858
1859    #[inline]
1860    pub(crate) fn set_scalar(&mut self, off: usize, v: OverlayValue) -> isize {
1861        let removed = self.remove_scalar(off);
1862        let new_est = Self::point_estimate(&v);
1863        self.points.insert(off, v);
1864        self.adjust_estimated_bytes(new_est as isize);
1865        removed.saturating_add(new_est as isize)
1866    }
1867
1868    #[inline]
1869    pub fn set(&mut self, off: usize, v: OverlayValue) -> isize {
1870        self.set_scalar(off, v)
1871    }
1872
1873    pub(crate) fn apply_fragment(&mut self, fragment: OverlayFragment) -> isize {
1874        let mut delta = self.remove_points_covered_by_fragment(&fragment);
1875        delta = delta.saturating_add(self.remove_fragments_covered_by_fragment(&fragment));
1876
1877        let fragment_est = fragment.estimated_bytes();
1878        self.fragments.push(fragment);
1879        self.adjust_estimated_bytes(fragment_est as isize);
1880        delta.saturating_add(fragment_est as isize)
1881    }
1882
1883    fn remove_points_covered_by_fragment(&mut self, fragment: &OverlayFragment) -> isize {
1884        let mut removed = 0usize;
1885        match fragment {
1886            OverlayFragment::SparseOffsets { offsets, .. } => {
1887                for off in offsets.iter().copied() {
1888                    if let Some(old) = self.points.remove(&(off as usize)) {
1889                        removed = removed.saturating_add(Self::point_estimate(&old));
1890                    }
1891                }
1892            }
1893            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1894                if let Some(range) = fragment.interval_coverage() {
1895                    let keys: Vec<_> = self
1896                        .points
1897                        .keys()
1898                        .copied()
1899                        .filter(|off| range.contains(off))
1900                        .collect();
1901                    for off in keys {
1902                        if let Some(old) = self.points.remove(&off) {
1903                            removed = removed.saturating_add(Self::point_estimate(&old));
1904                        }
1905                    }
1906                }
1907            }
1908        }
1909        self.estimated_bytes = self.estimated_bytes.saturating_sub(removed);
1910        -(removed as isize)
1911    }
1912
1913    fn remove_fragments_covered_by_fragment(&mut self, replacement: &OverlayFragment) -> isize {
1914        if self.fragments.is_empty() {
1915            return 0;
1916        }
1917
1918        let mut delta: isize = 0;
1919        let mut fragments = Vec::with_capacity(self.fragments.len());
1920        for fragment in self.fragments.drain(..) {
1921            if !fragment.intersects_fragment_exact(replacement) {
1922                fragments.push(fragment);
1923                continue;
1924            }
1925
1926            let old_est = fragment.estimated_bytes();
1927            let replacements = fragment.subtract_fragment(replacement);
1928            let new_est = replacements
1929                .iter()
1930                .map(OverlayFragment::estimated_bytes)
1931                .fold(0usize, usize::saturating_add);
1932            fragments.extend(replacements);
1933            delta = delta.saturating_add(new_est as isize - old_est as isize);
1934        }
1935        self.fragments = fragments;
1936        self.adjust_estimated_bytes(delta);
1937        delta
1938    }
1939
1940    #[inline]
1941    pub(crate) fn remove_scalar(&mut self, off: usize) -> isize {
1942        let mut delta = 0isize;
1943        if let Some(old) = self.points.remove(&off) {
1944            let old_est = Self::point_estimate(&old);
1945            self.estimated_bytes = self.estimated_bytes.saturating_sub(old_est);
1946            delta = delta.saturating_sub(old_est as isize);
1947        }
1948
1949        if !self.fragments.is_empty() {
1950            let mut fragments = Vec::with_capacity(self.fragments.len());
1951            for fragment in self.fragments.drain(..) {
1952                if fragment.get_scalar(off).is_none() {
1953                    fragments.push(fragment);
1954                    continue;
1955                }
1956
1957                let old_est = fragment.estimated_bytes();
1958                let replacements = fragment.subtract_offset(off);
1959                let new_est = replacements
1960                    .iter()
1961                    .map(OverlayFragment::estimated_bytes)
1962                    .fold(0usize, usize::saturating_add);
1963                fragments.extend(replacements);
1964                delta = delta.saturating_add(new_est as isize - old_est as isize);
1965            }
1966            self.fragments = fragments;
1967            self.adjust_estimated_bytes(delta);
1968        }
1969
1970        delta
1971    }
1972
1973    #[inline]
1974    pub fn remove(&mut self, off: usize) -> isize {
1975        self.remove_scalar(off)
1976    }
1977
1978    #[inline]
1979    pub(crate) fn clear_all(&mut self) -> usize {
1980        let freed = self.estimated_bytes;
1981        self.points.clear();
1982        self.fragments.clear();
1983        self.estimated_bytes = 0;
1984        freed
1985    }
1986
1987    #[inline]
1988    pub fn clear(&mut self) -> usize {
1989        self.clear_all()
1990    }
1991
1992    #[inline]
1993    pub fn len(&self) -> usize {
1994        self.points.len().saturating_add(
1995            self.fragments
1996                .iter()
1997                .map(OverlayFragment::coverage_len)
1998                .sum(),
1999        )
2000    }
2001
2002    #[inline]
2003    pub fn estimated_bytes(&self) -> usize {
2004        self.estimated_bytes
2005    }
2006
2007    #[inline]
2008    pub fn is_empty(&self) -> bool {
2009        self.points.is_empty() && self.fragments.is_empty()
2010    }
2011
2012    #[inline]
2013    pub(crate) fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2014        self.points.keys().any(|k| range.contains(k))
2015            || self
2016                .fragments
2017                .iter()
2018                .any(|fragment| fragment.has_any_in_range(range.clone()))
2019    }
2020
2021    #[inline]
2022    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2023        self.has_any_in_range(range)
2024    }
2025
2026    pub(crate) fn slice(&self, off: usize, len: usize) -> Overlay {
2027        let mut out = Overlay::new();
2028        let end = off.saturating_add(len);
2029        for fragment in &self.fragments {
2030            if let Some(sliced) = fragment.slice(off, len) {
2031                let _ = out.apply_fragment(sliced);
2032            }
2033        }
2034        for (k, v) in self.points.iter() {
2035            if *k >= off && *k < end {
2036                let _ = out.set_scalar(*k - off, v.clone());
2037            }
2038        }
2039        out
2040    }
2041
2042    /// Iterate over logical `(offset, value)` pairs in the overlay.
2043    pub fn iter(&self) -> impl Iterator<Item = (usize, OverlayValue)> {
2044        let mut cells = BTreeMap::new();
2045        for fragment in &self.fragments {
2046            for (off, value) in fragment.cells() {
2047                cells.insert(off, value);
2048            }
2049        }
2050        for (off, value) in &self.points {
2051            cells.insert(*off, value.clone());
2052        }
2053        cells.into_iter()
2054    }
2055
2056    /// Iterate over physical point entries only.
2057    pub(crate) fn iter_points(&self) -> impl Iterator<Item = (&usize, &OverlayValue)> {
2058        self.points.iter()
2059    }
2060}
2061
2062#[cfg(test)]
2063#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
2064pub(crate) struct OverlayDebugStats {
2065    pub(crate) points: usize,
2066    pub(crate) sparse_fragments: usize,
2067    pub(crate) dense_fragments: usize,
2068    pub(crate) run_fragments: usize,
2069    pub(crate) covered_len: usize,
2070}
2071
2072#[cfg(test)]
2073impl Overlay {
2074    pub(crate) fn debug_stats(&self) -> OverlayDebugStats {
2075        let mut stats = OverlayDebugStats {
2076            points: self.points.len(),
2077            covered_len: self.len(),
2078            ..OverlayDebugStats::default()
2079        };
2080        for fragment in &self.fragments {
2081            match fragment {
2082                OverlayFragment::SparseOffsets { .. } => stats.sparse_fragments += 1,
2083                OverlayFragment::DenseRange { .. } => stats.dense_fragments += 1,
2084                OverlayFragment::RunRange { .. } => stats.run_fragments += 1,
2085            }
2086        }
2087        stats
2088    }
2089
2090    pub(crate) fn debug_is_normalized(&self) -> bool {
2091        let mut covered = std::collections::HashSet::new();
2092        for off in self.points.keys().copied() {
2093            if !covered.insert(off) {
2094                return false;
2095            }
2096        }
2097        for fragment in &self.fragments {
2098            for (off, _) in fragment.cells() {
2099                if !covered.insert(off) {
2100                    return false;
2101                }
2102            }
2103        }
2104        covered.len() == self.len()
2105    }
2106
2107    pub(crate) fn debug_recomputed_estimated_bytes(&self) -> usize {
2108        let point_bytes = self
2109            .points
2110            .values()
2111            .map(Self::point_estimate)
2112            .fold(0usize, usize::saturating_add);
2113        let fragment_bytes = self
2114            .fragments
2115            .iter()
2116            .map(OverlayFragment::estimated_bytes)
2117            .fold(0usize, usize::saturating_add);
2118        point_bytes.saturating_add(fragment_bytes)
2119    }
2120}
2121
2122#[derive(Debug, Clone, Copy, Default)]
2123#[cfg_attr(test, derive(serde::Serialize))]
2124pub(crate) struct OverlaySelectStats {
2125    pub(crate) zip_select_calls: usize,
2126    pub(crate) direct_dense_slices: usize,
2127    pub(crate) direct_run_materializations: usize,
2128    pub(crate) partial_sparse_intersections: usize,
2129    pub(crate) partial_dense_intersections: usize,
2130    pub(crate) partial_run_intersections: usize,
2131    pub(crate) partial_overlay_builds: usize,
2132    pub(crate) row_scalar_fallbacks: usize,
2133    pub(crate) point_entries_applied: usize,
2134    pub(crate) fragment_intersections: usize,
2135}
2136
2137#[cfg(test)]
2138thread_local! {
2139    static OVERLAY_SELECT_STATS: std::cell::RefCell<OverlaySelectStats> =
2140        std::cell::RefCell::new(OverlaySelectStats::default());
2141}
2142
2143#[cfg(test)]
2144pub(crate) fn reset_overlay_select_stats() {
2145    OVERLAY_SELECT_STATS.with(|stats| *stats.borrow_mut() = OverlaySelectStats::default());
2146}
2147
2148#[cfg(test)]
2149pub(crate) fn snapshot_overlay_select_stats() -> OverlaySelectStats {
2150    OVERLAY_SELECT_STATS.with(|stats| *stats.borrow())
2151}
2152
2153#[cfg(test)]
2154fn record_overlay_select_stats(f: impl FnOnce(&mut OverlaySelectStats)) {
2155    OVERLAY_SELECT_STATS.with(|stats| f(&mut stats.borrow_mut()));
2156}
2157
2158#[cfg(not(test))]
2159#[inline]
2160fn record_overlay_select_stats(_f: impl FnOnce(&mut OverlaySelectStats)) {}
2161
2162#[derive(Debug, Clone, Copy, Eq, PartialEq)]
2163enum OverlayFragmentShape {
2164    Sparse,
2165    Dense,
2166    Run,
2167}
2168
2169struct OverlaySlots<T> {
2170    present: Vec<bool>,
2171    values: Vec<Option<T>>,
2172    any_present: bool,
2173}
2174
2175impl<T> OverlaySlots<T> {
2176    fn new(len: usize) -> Self {
2177        Self {
2178            present: vec![false; len],
2179            values: (0..len).map(|_| None).collect(),
2180            any_present: false,
2181        }
2182    }
2183
2184    #[inline]
2185    fn set(&mut self, idx: usize, value: Option<T>) {
2186        if idx >= self.present.len() {
2187            return;
2188        }
2189        self.present[idx] = true;
2190        self.values[idx] = value;
2191        self.any_present = true;
2192    }
2193
2194    #[inline]
2195    fn any_present(&self) -> bool {
2196        self.any_present
2197    }
2198}
2199
2200pub(crate) struct OverlayCascade<'a> {
2201    user: &'a Overlay,
2202    computed: &'a Overlay,
2203}
2204
2205impl<'a> OverlayCascade<'a> {
2206    #[inline]
2207    pub(crate) fn new(user: &'a Overlay, computed: &'a Overlay) -> Self {
2208        Self { user, computed }
2209    }
2210
2211    #[inline]
2212    pub(crate) fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'a>> {
2213        self.user
2214            .get_scalar(off)
2215            .or_else(|| self.computed.get_scalar(off))
2216    }
2217
2218    #[inline]
2219    pub(crate) fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2220        self.user.has_any_in_range(range.clone()) || self.computed.has_any_in_range(range)
2221    }
2222
2223    pub(crate) fn select_numbers(
2224        &self,
2225        range: core::ops::Range<usize>,
2226        base: &Float64Array,
2227    ) -> Arc<Float64Array> {
2228        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2229            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2230            return Self::dense_numbers(fragment, range);
2231        }
2232        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2233            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2234            return Self::run_numbers(fragment, range);
2235        }
2236        if !self.user.has_any_in_range(range.clone()) {
2237            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2238                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2239                return Self::dense_numbers(fragment, range);
2240            }
2241            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2242                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2243                return Self::run_numbers(fragment, range);
2244            }
2245        }
2246
2247        if !self.has_any_in_range(range.clone()) {
2248            return Arc::new(base.clone());
2249        }
2250
2251        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2252        let len = range.end.saturating_sub(range.start);
2253        let mut slots = OverlaySlots::<f64>::new(len);
2254        Self::apply_number_layer(self.computed, range.clone(), &mut slots);
2255        Self::apply_number_layer(self.user, range.clone(), &mut slots);
2256        if !slots.any_present() {
2257            return Arc::new(base.clone());
2258        }
2259
2260        let mut mask_b = BooleanBuilder::with_capacity(len);
2261        let mut values_b = Float64Builder::with_capacity(len);
2262        for idx in 0..len {
2263            mask_b.append_value(slots.present[idx]);
2264            match slots.values[idx] {
2265                Some(value) => values_b.append_value(value),
2266                None => values_b.append_null(),
2267            }
2268        }
2269        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2270        let mask = mask_b.finish();
2271        let values = values_b.finish();
2272        let zipped =
2273            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip numeric overlay");
2274        Arc::new(
2275            zipped
2276                .as_any()
2277                .downcast_ref::<Float64Array>()
2278                .expect("numeric overlay zip type")
2279                .clone(),
2280        )
2281    }
2282
2283    pub(crate) fn select_booleans(
2284        &self,
2285        range: core::ops::Range<usize>,
2286        base: &BooleanArray,
2287    ) -> Arc<BooleanArray> {
2288        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2289            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2290            return Self::dense_booleans(fragment, range);
2291        }
2292        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2293            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2294            return Self::run_booleans(fragment, range);
2295        }
2296        if !self.user.has_any_in_range(range.clone()) {
2297            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2298                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2299                return Self::dense_booleans(fragment, range);
2300            }
2301            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2302                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2303                return Self::run_booleans(fragment, range);
2304            }
2305        }
2306
2307        if !self.has_any_in_range(range.clone()) {
2308            return Arc::new(base.clone());
2309        }
2310
2311        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2312        let len = range.end.saturating_sub(range.start);
2313        let mut slots = OverlaySlots::<bool>::new(len);
2314        Self::apply_boolean_layer(self.computed, range.clone(), &mut slots);
2315        Self::apply_boolean_layer(self.user, range.clone(), &mut slots);
2316        if !slots.any_present() {
2317            return Arc::new(base.clone());
2318        }
2319
2320        let mut mask_b = BooleanBuilder::with_capacity(len);
2321        let mut values_b = BooleanBuilder::with_capacity(len);
2322        for idx in 0..len {
2323            mask_b.append_value(slots.present[idx]);
2324            match slots.values[idx] {
2325                Some(value) => values_b.append_value(value),
2326                None => values_b.append_null(),
2327            }
2328        }
2329        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2330        let mask = mask_b.finish();
2331        let values = values_b.finish();
2332        let zipped =
2333            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip boolean overlay");
2334        Arc::new(
2335            zipped
2336                .as_any()
2337                .downcast_ref::<BooleanArray>()
2338                .expect("boolean overlay zip type")
2339                .clone(),
2340        )
2341    }
2342
2343    pub(crate) fn select_text(
2344        &self,
2345        range: core::ops::Range<usize>,
2346        base: &StringArray,
2347    ) -> ArrayRef {
2348        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2349            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2350            return Self::dense_text(fragment, range);
2351        }
2352        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2353            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2354            return Self::run_text(fragment, range);
2355        }
2356        if !self.user.has_any_in_range(range.clone()) {
2357            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2358                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2359                return Self::dense_text(fragment, range);
2360            }
2361            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2362                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2363                return Self::run_text(fragment, range);
2364            }
2365        }
2366
2367        if !self.has_any_in_range(range.clone()) {
2368            return Arc::new(base.clone()) as ArrayRef;
2369        }
2370
2371        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2372        let len = range.end.saturating_sub(range.start);
2373        let mut slots = OverlaySlots::<String>::new(len);
2374        Self::apply_text_layer(self.computed, range.clone(), &mut slots);
2375        Self::apply_text_layer(self.user, range.clone(), &mut slots);
2376        if !slots.any_present() {
2377            return Arc::new(base.clone()) as ArrayRef;
2378        }
2379
2380        let mut mask_b = BooleanBuilder::with_capacity(len);
2381        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2382        for idx in 0..len {
2383            mask_b.append_value(slots.present[idx]);
2384            match &slots.values[idx] {
2385                Some(value) => values_b.append_value(value),
2386                None => values_b.append_null(),
2387            }
2388        }
2389        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2390        let mask = mask_b.finish();
2391        let values = values_b.finish();
2392        crate::compute_prelude::zip_select(&mask, &values, base).expect("zip text overlay")
2393    }
2394
2395    pub(crate) fn select_errors(
2396        &self,
2397        range: core::ops::Range<usize>,
2398        base: &UInt8Array,
2399    ) -> Arc<UInt8Array> {
2400        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2401            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2402            return Self::dense_errors(fragment, range);
2403        }
2404        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2405            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2406            return Self::run_errors(fragment, range);
2407        }
2408        if !self.user.has_any_in_range(range.clone()) {
2409            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2410                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2411                return Self::dense_errors(fragment, range);
2412            }
2413            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2414                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2415                return Self::run_errors(fragment, range);
2416            }
2417        }
2418
2419        if !self.has_any_in_range(range.clone()) {
2420            return Arc::new(base.clone());
2421        }
2422
2423        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2424        let len = range.end.saturating_sub(range.start);
2425        let mut slots = OverlaySlots::<u8>::new(len);
2426        Self::apply_error_layer(self.computed, range.clone(), &mut slots);
2427        Self::apply_error_layer(self.user, range.clone(), &mut slots);
2428        if !slots.any_present() {
2429            return Arc::new(base.clone());
2430        }
2431
2432        let mut mask_b = BooleanBuilder::with_capacity(len);
2433        let mut values_b = UInt8Builder::with_capacity(len);
2434        for idx in 0..len {
2435            mask_b.append_value(slots.present[idx]);
2436            match slots.values[idx] {
2437                Some(value) => values_b.append_value(value),
2438                None => values_b.append_null(),
2439            }
2440        }
2441        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2442        let mask = mask_b.finish();
2443        let values = values_b.finish();
2444        let zipped =
2445            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip error overlay");
2446        Arc::new(
2447            zipped
2448                .as_any()
2449                .downcast_ref::<UInt8Array>()
2450                .expect("error overlay zip type")
2451                .clone(),
2452        )
2453    }
2454
2455    pub(crate) fn select_type_tags(
2456        &self,
2457        range: core::ops::Range<usize>,
2458        base: &UInt8Array,
2459    ) -> Arc<UInt8Array> {
2460        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2461            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2462            return Self::dense_type_tags(fragment, range);
2463        }
2464        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2465            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2466            return Self::run_type_tags(fragment, range);
2467        }
2468        if !self.user.has_any_in_range(range.clone()) {
2469            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2470                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2471                return Self::dense_type_tags(fragment, range);
2472            }
2473            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2474                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2475                return Self::run_type_tags(fragment, range);
2476            }
2477        }
2478
2479        if !self.has_any_in_range(range.clone()) {
2480            return Arc::new(base.clone());
2481        }
2482
2483        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2484        let len = range.end.saturating_sub(range.start);
2485        let mut slots = OverlaySlots::<u8>::new(len);
2486        Self::apply_type_tag_layer(self.computed, range.clone(), &mut slots);
2487        Self::apply_type_tag_layer(self.user, range.clone(), &mut slots);
2488        if !slots.any_present() {
2489            return Arc::new(base.clone());
2490        }
2491
2492        let mut mask_b = BooleanBuilder::with_capacity(len);
2493        let mut values_b = UInt8Builder::with_capacity(len);
2494        for idx in 0..len {
2495            mask_b.append_value(slots.present[idx]);
2496            match slots.values[idx] {
2497                Some(value) => values_b.append_value(value),
2498                None => values_b.append_null(),
2499            }
2500        }
2501        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2502        let mask = mask_b.finish();
2503        let values = values_b.finish();
2504        let zipped =
2505            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip type-tag overlay");
2506        Arc::new(
2507            zipped
2508                .as_any()
2509                .downcast_ref::<UInt8Array>()
2510                .expect("type-tag overlay zip type")
2511                .clone(),
2512        )
2513    }
2514
2515    pub(crate) fn select_lowered_text(
2516        &self,
2517        range: core::ops::Range<usize>,
2518        base: &StringArray,
2519    ) -> Arc<StringArray> {
2520        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2521            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2522            return Self::dense_lowered_text(fragment, range);
2523        }
2524        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2525            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2526            return Self::run_lowered_text(fragment, range);
2527        }
2528        if !self.user.has_any_in_range(range.clone()) {
2529            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2530                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2531                return Self::dense_lowered_text(fragment, range);
2532            }
2533            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2534                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2535                return Self::run_lowered_text(fragment, range);
2536            }
2537        }
2538
2539        if !self.has_any_in_range(range.clone()) {
2540            return Arc::new(base.clone());
2541        }
2542        if self.user.fragments.is_empty() && self.computed.fragments.is_empty() {
2543            return self.select_lowered_text_point_scalar(range, base);
2544        }
2545
2546        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2547        let len = range.end.saturating_sub(range.start);
2548        let mut slots = OverlaySlots::<String>::new(len);
2549        Self::apply_lowered_text_layer(self.computed, range.clone(), &mut slots);
2550        Self::apply_lowered_text_layer(self.user, range.clone(), &mut slots);
2551        if !slots.any_present() {
2552            return Arc::new(base.clone());
2553        }
2554
2555        let mut mask_b = BooleanBuilder::with_capacity(len);
2556        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2557        for idx in 0..len {
2558            mask_b.append_value(slots.present[idx]);
2559            match &slots.values[idx] {
2560                Some(value) => values_b.append_value(value),
2561                None => values_b.append_null(),
2562            }
2563        }
2564        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2565        let mask = mask_b.finish();
2566        let values = values_b.finish();
2567        let zipped = crate::compute_prelude::zip_select(&mask, &values, base)
2568            .expect("zip lowered text overlay");
2569        Arc::new(
2570            zipped
2571                .as_any()
2572                .downcast_ref::<StringArray>()
2573                .expect("lowered text overlay zip type")
2574                .clone(),
2575        )
2576    }
2577
2578    fn select_lowered_text_point_scalar(
2579        &self,
2580        range: core::ops::Range<usize>,
2581        base: &StringArray,
2582    ) -> Arc<StringArray> {
2583        let len = range.end.saturating_sub(range.start);
2584        let mut mask_b = BooleanBuilder::with_capacity(len);
2585        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2586        record_overlay_select_stats(|stats| stats.row_scalar_fallbacks += len);
2587        for off in range {
2588            if let Some(value) = self.get_scalar(off) {
2589                mask_b.append_value(true);
2590                if let Some(s) = value.lowered_text_value() {
2591                    values_b.append_value(&s);
2592                } else {
2593                    values_b.append_null();
2594                }
2595                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2596            } else {
2597                mask_b.append_value(false);
2598                values_b.append_null();
2599            }
2600        }
2601        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2602        let mask = mask_b.finish();
2603        let values = values_b.finish();
2604        let zipped = crate::compute_prelude::zip_select(&mask, &values, base)
2605            .expect("zip lowered text overlay");
2606        Arc::new(
2607            zipped
2608                .as_any()
2609                .downcast_ref::<StringArray>()
2610                .expect("lowered text overlay zip type")
2611                .clone(),
2612        )
2613    }
2614
2615    fn dense_numbers(
2616        fragment: &OverlayFragment,
2617        range: core::ops::Range<usize>,
2618    ) -> Arc<Float64Array> {
2619        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2620        Self::payload_numbers_slice(payload, rel_start, len)
2621    }
2622
2623    fn dense_booleans(
2624        fragment: &OverlayFragment,
2625        range: core::ops::Range<usize>,
2626    ) -> Arc<BooleanArray> {
2627        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2628        Self::payload_booleans_slice(payload, rel_start, len)
2629    }
2630
2631    fn dense_text(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> ArrayRef {
2632        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2633        Self::payload_text_slice(payload, rel_start, len)
2634    }
2635
2636    fn dense_errors(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> Arc<UInt8Array> {
2637        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2638        Self::payload_errors_slice(payload, rel_start, len)
2639    }
2640
2641    fn dense_type_tags(
2642        fragment: &OverlayFragment,
2643        range: core::ops::Range<usize>,
2644    ) -> Arc<UInt8Array> {
2645        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2646        Self::payload_type_tags_slice(payload, rel_start, len)
2647    }
2648
2649    fn dense_lowered_text(
2650        fragment: &OverlayFragment,
2651        range: core::ops::Range<usize>,
2652    ) -> Arc<StringArray> {
2653        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2654        Self::payload_lowered_text_materialize(payload, rel_start, len)
2655    }
2656
2657    fn dense_payload_window(
2658        fragment: &OverlayFragment,
2659        range: core::ops::Range<usize>,
2660    ) -> (usize, usize, &OverlayFragmentPayload) {
2661        let OverlayFragment::DenseRange { start, payload, .. } = fragment else {
2662            unreachable!("dense payload window requires DenseRange")
2663        };
2664        let rel_start = range.start.saturating_sub(*start as usize);
2665        (rel_start, range.end.saturating_sub(range.start), payload)
2666    }
2667
2668    fn run_numbers(
2669        fragment: &OverlayFragment,
2670        range: core::ops::Range<usize>,
2671    ) -> Arc<Float64Array> {
2672        let mut b = Float64Builder::with_capacity(range.end.saturating_sub(range.start));
2673        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2674            if let Some(value) = payload.number_at(run_idx) {
2675                for _ in 0..repeat {
2676                    b.append_value(value);
2677                }
2678            } else {
2679                for _ in 0..repeat {
2680                    b.append_null();
2681                }
2682            }
2683        });
2684        Arc::new(b.finish())
2685    }
2686
2687    fn run_booleans(
2688        fragment: &OverlayFragment,
2689        range: core::ops::Range<usize>,
2690    ) -> Arc<BooleanArray> {
2691        let mut b = BooleanBuilder::with_capacity(range.end.saturating_sub(range.start));
2692        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2693            if let Some(value) = payload.boolean_at(run_idx) {
2694                for _ in 0..repeat {
2695                    b.append_value(value);
2696                }
2697            } else {
2698                for _ in 0..repeat {
2699                    b.append_null();
2700                }
2701            }
2702        });
2703        Arc::new(b.finish())
2704    }
2705
2706    fn run_text(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> ArrayRef {
2707        let mut b = StringBuilder::with_capacity(
2708            range.end.saturating_sub(range.start),
2709            range.end.saturating_sub(range.start).saturating_mul(8),
2710        );
2711        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2712            if let Some(value) = payload.text_at(run_idx) {
2713                for _ in 0..repeat {
2714                    b.append_value(value);
2715                }
2716            } else {
2717                for _ in 0..repeat {
2718                    b.append_null();
2719                }
2720            }
2721        });
2722        Arc::new(b.finish()) as ArrayRef
2723    }
2724
2725    fn run_errors(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> Arc<UInt8Array> {
2726        let mut b = UInt8Builder::with_capacity(range.end.saturating_sub(range.start));
2727        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2728            if let Some(value) = payload.error_at(run_idx) {
2729                for _ in 0..repeat {
2730                    b.append_value(value);
2731                }
2732            } else {
2733                for _ in 0..repeat {
2734                    b.append_null();
2735                }
2736            }
2737        });
2738        Arc::new(b.finish())
2739    }
2740
2741    fn run_type_tags(
2742        fragment: &OverlayFragment,
2743        range: core::ops::Range<usize>,
2744    ) -> Arc<UInt8Array> {
2745        let mut b = UInt8Builder::with_capacity(range.end.saturating_sub(range.start));
2746        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2747            let tag = payload.type_tag_at(run_idx).unwrap_or(TypeTag::Empty) as u8;
2748            for _ in 0..repeat {
2749                b.append_value(tag);
2750            }
2751        });
2752        Arc::new(b.finish())
2753    }
2754
2755    fn run_lowered_text(
2756        fragment: &OverlayFragment,
2757        range: core::ops::Range<usize>,
2758    ) -> Arc<StringArray> {
2759        let mut b = StringBuilder::with_capacity(
2760            range.end.saturating_sub(range.start),
2761            range.end.saturating_sub(range.start).saturating_mul(8),
2762        );
2763        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2764            let value = Self::payload_lowered_text_at(payload, run_idx);
2765            if let Some(value) = value {
2766                for _ in 0..repeat {
2767                    b.append_value(&value);
2768                }
2769            } else {
2770                for _ in 0..repeat {
2771                    b.append_null();
2772                }
2773            }
2774        });
2775        Arc::new(b.finish())
2776    }
2777
2778    fn payload_numbers_slice(
2779        payload: &OverlayFragmentPayload,
2780        start: usize,
2781        len: usize,
2782    ) -> Arc<Float64Array> {
2783        if let Some(array) = &payload.numbers {
2784            let sliced = array.slice(start, len);
2785            Arc::new(
2786                sliced
2787                    .as_any()
2788                    .downcast_ref::<Float64Array>()
2789                    .unwrap()
2790                    .clone(),
2791            )
2792        } else {
2793            Self::null_numbers(len)
2794        }
2795    }
2796
2797    fn payload_booleans_slice(
2798        payload: &OverlayFragmentPayload,
2799        start: usize,
2800        len: usize,
2801    ) -> Arc<BooleanArray> {
2802        if let Some(array) = &payload.booleans {
2803            let sliced = array.slice(start, len);
2804            Arc::new(
2805                sliced
2806                    .as_any()
2807                    .downcast_ref::<BooleanArray>()
2808                    .unwrap()
2809                    .clone(),
2810            )
2811        } else {
2812            Self::null_booleans(len)
2813        }
2814    }
2815
2816    fn payload_text_slice(payload: &OverlayFragmentPayload, start: usize, len: usize) -> ArrayRef {
2817        if let Some(array) = &payload.text {
2818            array.slice(start, len)
2819        } else {
2820            new_null_array(&DataType::Utf8, len)
2821        }
2822    }
2823
2824    fn payload_errors_slice(
2825        payload: &OverlayFragmentPayload,
2826        start: usize,
2827        len: usize,
2828    ) -> Arc<UInt8Array> {
2829        if let Some(array) = &payload.errors {
2830            let sliced = array.slice(start, len);
2831            Arc::new(
2832                sliced
2833                    .as_any()
2834                    .downcast_ref::<UInt8Array>()
2835                    .unwrap()
2836                    .clone(),
2837            )
2838        } else {
2839            Self::null_errors(len)
2840        }
2841    }
2842
2843    fn payload_type_tags_slice(
2844        payload: &OverlayFragmentPayload,
2845        start: usize,
2846        len: usize,
2847    ) -> Arc<UInt8Array> {
2848        let sliced = payload.type_tags.slice(start, len);
2849        Arc::new(
2850            sliced
2851                .as_any()
2852                .downcast_ref::<UInt8Array>()
2853                .unwrap()
2854                .clone(),
2855        )
2856    }
2857
2858    fn payload_lowered_text_materialize(
2859        payload: &OverlayFragmentPayload,
2860        start: usize,
2861        len: usize,
2862    ) -> Arc<StringArray> {
2863        let mut b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2864        for idx in start..start.saturating_add(len) {
2865            if let Some(value) = Self::payload_lowered_text_at(payload, idx) {
2866                b.append_value(&value);
2867            } else {
2868                b.append_null();
2869            }
2870        }
2871        Arc::new(b.finish())
2872    }
2873
2874    fn payload_lowered_text_at(payload: &OverlayFragmentPayload, idx: usize) -> Option<String> {
2875        match payload.type_tag_at(idx)? {
2876            TypeTag::Text => payload.text_at(idx).map(|value| value.to_lowercase()),
2877            TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
2878                payload.number_at(idx).map(|value| value.to_string())
2879            }
2880            TypeTag::Boolean => payload
2881                .boolean_at(idx)
2882                .map(|value| if value { "true" } else { "false" }.to_string()),
2883            TypeTag::Empty | TypeTag::Error | TypeTag::Pending => None,
2884        }
2885    }
2886
2887    fn null_numbers(len: usize) -> Arc<Float64Array> {
2888        let arr = new_null_array(&DataType::Float64, len);
2889        Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
2890    }
2891
2892    fn null_booleans(len: usize) -> Arc<BooleanArray> {
2893        let arr = new_null_array(&DataType::Boolean, len);
2894        Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
2895    }
2896
2897    fn null_errors(len: usize) -> Arc<UInt8Array> {
2898        let arr = new_null_array(&DataType::UInt8, len);
2899        Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
2900    }
2901
2902    fn apply_number_layer(
2903        layer: &Overlay,
2904        range: core::ops::Range<usize>,
2905        slots: &mut OverlaySlots<f64>,
2906    ) {
2907        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2908            payload.number_at(idx)
2909        });
2910        for (off, value) in layer.iter_points() {
2911            if range.contains(off) {
2912                slots.set(*off - range.start, value.numeric_lane_value());
2913                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2914            }
2915        }
2916    }
2917
2918    fn apply_boolean_layer(
2919        layer: &Overlay,
2920        range: core::ops::Range<usize>,
2921        slots: &mut OverlaySlots<bool>,
2922    ) {
2923        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2924            payload.boolean_at(idx)
2925        });
2926        for (off, value) in layer.iter_points() {
2927            if range.contains(off) {
2928                slots.set(*off - range.start, value.boolean_lane_value());
2929                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2930            }
2931        }
2932    }
2933
2934    fn apply_text_layer(
2935        layer: &Overlay,
2936        range: core::ops::Range<usize>,
2937        slots: &mut OverlaySlots<String>,
2938    ) {
2939        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2940            payload.text_at(idx).map(ToString::to_string)
2941        });
2942        for (off, value) in layer.iter_points() {
2943            if range.contains(off) {
2944                slots.set(
2945                    *off - range.start,
2946                    value.text_lane_value().map(ToString::to_string),
2947                );
2948                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2949            }
2950        }
2951    }
2952
2953    fn apply_error_layer(
2954        layer: &Overlay,
2955        range: core::ops::Range<usize>,
2956        slots: &mut OverlaySlots<u8>,
2957    ) {
2958        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2959            payload.error_at(idx)
2960        });
2961        for (off, value) in layer.iter_points() {
2962            if range.contains(off) {
2963                slots.set(*off - range.start, value.error_lane_value());
2964                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2965            }
2966        }
2967    }
2968
2969    fn apply_type_tag_layer(
2970        layer: &Overlay,
2971        range: core::ops::Range<usize>,
2972        slots: &mut OverlaySlots<u8>,
2973    ) {
2974        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2975            payload.type_tag_at(idx).map(|tag| tag as u8)
2976        });
2977        for (off, value) in layer.iter_points() {
2978            if range.contains(off) {
2979                slots.set(*off - range.start, Some(value.type_tag() as u8));
2980                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2981            }
2982        }
2983    }
2984
2985    fn apply_lowered_text_layer(
2986        layer: &Overlay,
2987        range: core::ops::Range<usize>,
2988        slots: &mut OverlaySlots<String>,
2989    ) {
2990        Self::apply_fragment_layer(layer, range.clone(), slots, Self::payload_lowered_text_at);
2991        for (off, value) in layer.iter_points() {
2992            if range.contains(off) {
2993                slots.set(*off - range.start, value.lowered_text_value());
2994                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2995            }
2996        }
2997    }
2998
2999    fn apply_fragment_layer<T>(
3000        layer: &Overlay,
3001        range: core::ops::Range<usize>,
3002        slots: &mut OverlaySlots<T>,
3003        mut value_at: impl FnMut(&OverlayFragmentPayload, usize) -> Option<T>,
3004    ) {
3005        for fragment in &layer.fragments {
3006            if !fragment.has_any_in_range(range.clone()) {
3007                continue;
3008            }
3009            Self::record_fragment_intersection(fragment);
3010            Self::for_each_fragment_payload_index(
3011                fragment,
3012                range.clone(),
3013                |out_idx, payload, payload_idx| {
3014                    slots.set(out_idx, value_at(payload, payload_idx));
3015                },
3016            );
3017        }
3018    }
3019
3020    fn record_fragment_intersection(fragment: &OverlayFragment) {
3021        let shape = match fragment {
3022            OverlayFragment::SparseOffsets { .. } => OverlayFragmentShape::Sparse,
3023            OverlayFragment::DenseRange { .. } => OverlayFragmentShape::Dense,
3024            OverlayFragment::RunRange { .. } => OverlayFragmentShape::Run,
3025        };
3026        record_overlay_select_stats(|stats| {
3027            stats.fragment_intersections += 1;
3028            match shape {
3029                OverlayFragmentShape::Sparse => stats.partial_sparse_intersections += 1,
3030                OverlayFragmentShape::Dense => stats.partial_dense_intersections += 1,
3031                OverlayFragmentShape::Run => stats.partial_run_intersections += 1,
3032            }
3033        });
3034    }
3035
3036    fn for_each_fragment_payload_index(
3037        fragment: &OverlayFragment,
3038        range: core::ops::Range<usize>,
3039        mut f: impl FnMut(usize, &OverlayFragmentPayload, usize),
3040    ) {
3041        if range.is_empty() {
3042            return;
3043        }
3044        match fragment {
3045            OverlayFragment::SparseOffsets { offsets, payload } => {
3046                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
3047                let lo = offsets.partition_point(|off| *off < start);
3048                let hi = offsets.partition_point(|off| (*off as usize) < range.end);
3049                for (idx, off) in offsets.iter().enumerate().take(hi).skip(lo) {
3050                    let out_idx = (*off as usize).saturating_sub(range.start);
3051                    f(out_idx, payload, idx);
3052                }
3053            }
3054            OverlayFragment::DenseRange {
3055                start,
3056                len,
3057                payload,
3058            } => {
3059                let frag_start = *start as usize;
3060                let frag_end = frag_start.saturating_add(*len as usize);
3061                let inter_start = frag_start.max(range.start);
3062                let inter_end = frag_end.min(range.end);
3063                if inter_start >= inter_end {
3064                    return;
3065                }
3066                for abs in inter_start..inter_end {
3067                    f(abs - range.start, payload, abs - frag_start);
3068                }
3069            }
3070            OverlayFragment::RunRange {
3071                start,
3072                len,
3073                run_ends,
3074                payload,
3075            } => {
3076                let frag_start = *start as usize;
3077                let frag_end = frag_start.saturating_add(*len as usize);
3078                let inter_start = frag_start.max(range.start);
3079                let inter_end = frag_end.min(range.end);
3080                if inter_start >= inter_end {
3081                    return;
3082                }
3083                let mut prev_end = 0usize;
3084                for (run_idx, run_end) in run_ends.iter().enumerate() {
3085                    let run_start_abs = frag_start.saturating_add(prev_end);
3086                    let run_end_abs = frag_start.saturating_add(*run_end as usize);
3087                    let start_abs = run_start_abs.max(inter_start);
3088                    let end_abs = run_end_abs.min(inter_end);
3089                    if start_abs < end_abs {
3090                        for abs in start_abs..end_abs {
3091                            f(abs - range.start, payload, run_idx);
3092                        }
3093                    }
3094                    prev_end = *run_end as usize;
3095                    if run_end_abs >= inter_end {
3096                        break;
3097                    }
3098                }
3099            }
3100        }
3101    }
3102
3103    fn for_each_run_payload_index(
3104        fragment: &OverlayFragment,
3105        range: core::ops::Range<usize>,
3106        mut f: impl FnMut(&OverlayFragmentPayload, usize, usize),
3107    ) {
3108        let OverlayFragment::RunRange {
3109            start,
3110            len,
3111            run_ends,
3112            payload,
3113        } = fragment
3114        else {
3115            unreachable!("run payload iteration requires RunRange")
3116        };
3117        let frag_start = *start as usize;
3118        let frag_end = frag_start.saturating_add(*len as usize);
3119        let inter_start = frag_start.max(range.start);
3120        let inter_end = frag_end.min(range.end);
3121        if inter_start >= inter_end {
3122            return;
3123        }
3124        let mut prev_end = 0usize;
3125        for (run_idx, run_end) in run_ends.iter().enumerate() {
3126            let run_start_abs = frag_start.saturating_add(prev_end);
3127            let run_end_abs = frag_start.saturating_add(*run_end as usize);
3128            let start_abs = run_start_abs.max(inter_start);
3129            let end_abs = run_end_abs.min(inter_end);
3130            if start_abs < end_abs {
3131                f(payload, run_idx, end_abs - start_abs);
3132            }
3133            prev_end = *run_end as usize;
3134            if run_end_abs >= inter_end {
3135                break;
3136            }
3137        }
3138    }
3139}
3140
3141impl OverlayFragmentPayload {
3142    #[inline]
3143    fn type_tag_at(&self, idx: usize) -> Option<TypeTag> {
3144        if idx >= self.type_tags.len() || self.type_tags.is_null(idx) {
3145            return None;
3146        }
3147        Some(TypeTag::from_u8(self.type_tags.value(idx)))
3148    }
3149}
3150
3151impl Overlay {
3152    fn full_cover_dense_fragment(
3153        &self,
3154        range: core::ops::Range<usize>,
3155    ) -> Option<&OverlayFragment> {
3156        self.full_cover_single_fragment(range, OverlayFragmentShape::Dense)
3157    }
3158
3159    fn full_cover_run_fragment(&self, range: core::ops::Range<usize>) -> Option<&OverlayFragment> {
3160        self.full_cover_single_fragment(range, OverlayFragmentShape::Run)
3161    }
3162
3163    fn full_cover_single_fragment(
3164        &self,
3165        range: core::ops::Range<usize>,
3166        shape: OverlayFragmentShape,
3167    ) -> Option<&OverlayFragment> {
3168        if range.is_empty() || self.points.keys().any(|off| range.contains(off)) {
3169            return None;
3170        }
3171        let mut found = None;
3172        for fragment in &self.fragments {
3173            if !fragment.has_any_in_range(range.clone()) {
3174                continue;
3175            }
3176            let shape_matches = matches!(
3177                (shape, fragment),
3178                (
3179                    OverlayFragmentShape::Dense,
3180                    OverlayFragment::DenseRange { .. }
3181                ) | (OverlayFragmentShape::Run, OverlayFragment::RunRange { .. })
3182            );
3183            let covers = fragment
3184                .interval_coverage()
3185                .is_some_and(|own| own.start <= range.start && range.end <= own.end);
3186            if shape_matches && covers && found.is_none() {
3187                found = Some(fragment);
3188            } else {
3189                return None;
3190            }
3191        }
3192        found
3193    }
3194}
3195fn append_overlay_value_to_lane_builders(
3196    ov: &OverlayValue,
3197    tag_b: &mut UInt8Builder,
3198    nb: &mut Float64Builder,
3199    bb: &mut BooleanBuilder,
3200    sb: &mut StringBuilder,
3201    eb: &mut UInt8Builder,
3202    non_num: &mut usize,
3203    non_bool: &mut usize,
3204    non_text: &mut usize,
3205    non_err: &mut usize,
3206) {
3207    match ov {
3208        OverlayValue::Empty => {
3209            tag_b.append_value(TypeTag::Empty as u8);
3210            nb.append_null();
3211            bb.append_null();
3212            sb.append_null();
3213            eb.append_null();
3214        }
3215        OverlayValue::Number(n) => {
3216            tag_b.append_value(TypeTag::Number as u8);
3217            nb.append_value(*n);
3218            *non_num += 1;
3219            bb.append_null();
3220            sb.append_null();
3221            eb.append_null();
3222        }
3223        OverlayValue::DateTime(serial) => {
3224            tag_b.append_value(TypeTag::DateTime as u8);
3225            nb.append_value(*serial);
3226            *non_num += 1;
3227            bb.append_null();
3228            sb.append_null();
3229            eb.append_null();
3230        }
3231        OverlayValue::Duration(serial) => {
3232            tag_b.append_value(TypeTag::Duration as u8);
3233            nb.append_value(*serial);
3234            *non_num += 1;
3235            bb.append_null();
3236            sb.append_null();
3237            eb.append_null();
3238        }
3239        OverlayValue::Boolean(b) => {
3240            tag_b.append_value(TypeTag::Boolean as u8);
3241            nb.append_null();
3242            bb.append_value(*b);
3243            *non_bool += 1;
3244            sb.append_null();
3245            eb.append_null();
3246        }
3247        OverlayValue::Text(s) => {
3248            tag_b.append_value(TypeTag::Text as u8);
3249            nb.append_null();
3250            bb.append_null();
3251            sb.append_value(s);
3252            *non_text += 1;
3253            eb.append_null();
3254        }
3255        OverlayValue::Error(code) => {
3256            tag_b.append_value(TypeTag::Error as u8);
3257            nb.append_null();
3258            bb.append_null();
3259            sb.append_null();
3260            eb.append_value(*code);
3261            *non_err += 1;
3262        }
3263        OverlayValue::Pending => {
3264            tag_b.append_value(TypeTag::Pending as u8);
3265            nb.append_null();
3266            bb.append_null();
3267            sb.append_null();
3268            eb.append_null();
3269        }
3270    }
3271}
3272
3273impl ArrowSheet {
3274    /// Return a summary of each column's chunk counts, total rows, and lane presence.
3275    pub fn shape(&self) -> Vec<ColumnShape> {
3276        self.columns
3277            .iter()
3278            .map(|c| {
3279                let chunks = c.chunks.len();
3280                let rows = self.nrows as usize;
3281                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
3282                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
3283                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
3284                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
3285                ColumnShape {
3286                    index: c.index,
3287                    chunks,
3288                    rows,
3289                    has_num,
3290                    has_bool,
3291                    has_text,
3292                    has_err,
3293                }
3294            })
3295            .collect()
3296    }
3297
3298    pub fn range_view(
3299        &self,
3300        sr: usize,
3301        sc: usize,
3302        er: usize,
3303        ec: usize,
3304    ) -> crate::engine::range_view::RangeView<'_> {
3305        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
3306        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
3307        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
3308        crate::engine::range_view::RangeView::new(
3309            crate::engine::range_view::RangeBacking::Borrowed(self),
3310            sr,
3311            sc,
3312            er,
3313            ec,
3314            rows,
3315            cols,
3316        )
3317    }
3318
3319    /// Fast single-cell read (0-based row/col) with overlay precedence.
3320    ///
3321    /// This avoids constructing a 1x1 RangeView and is intended for tight read loops.
3322    #[inline]
3323    pub fn get_cell_value(&self, abs_row: usize, abs_col: usize) -> LiteralValue {
3324        let sheet_rows = self.nrows as usize;
3325        if abs_row >= sheet_rows {
3326            return LiteralValue::Empty;
3327        }
3328        if abs_col >= self.columns.len() {
3329            return LiteralValue::Empty;
3330        }
3331        let Some((ch_idx, in_off)) = self.chunk_of_row(abs_row) else {
3332            return LiteralValue::Empty;
3333        };
3334        let col_ref = &self.columns[abs_col];
3335        let Some(ch) = col_ref.chunk(ch_idx) else {
3336            return LiteralValue::Empty;
3337        };
3338
3339        // Overlay takes precedence: user edits over computed over base.
3340        let cascade = OverlayCascade::new(&ch.overlay, &ch.computed_overlay);
3341        if let Some(ov) = cascade.get_scalar(in_off) {
3342            return ov.to_literal();
3343        }
3344
3345        // Read tag and route to lane.
3346        let tag_u8 = ch.type_tag.value(in_off);
3347        match TypeTag::from_u8(tag_u8) {
3348            TypeTag::Empty => LiteralValue::Empty,
3349            TypeTag::Number => {
3350                if let Some(arr) = &ch.numbers {
3351                    if arr.is_null(in_off) {
3352                        return LiteralValue::Empty;
3353                    }
3354                    LiteralValue::Number(arr.value(in_off))
3355                } else {
3356                    LiteralValue::Empty
3357                }
3358            }
3359            TypeTag::DateTime => {
3360                if let Some(arr) = &ch.numbers {
3361                    if arr.is_null(in_off) {
3362                        return LiteralValue::Empty;
3363                    }
3364                    LiteralValue::from_serial_number(arr.value(in_off))
3365                } else {
3366                    LiteralValue::Empty
3367                }
3368            }
3369            TypeTag::Duration => {
3370                if let Some(arr) = &ch.numbers {
3371                    if arr.is_null(in_off) {
3372                        return LiteralValue::Empty;
3373                    }
3374                    let serial = arr.value(in_off);
3375                    let nanos_f = serial * 86_400.0 * 1_000_000_000.0;
3376                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
3377                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
3378                } else {
3379                    LiteralValue::Empty
3380                }
3381            }
3382            TypeTag::Boolean => {
3383                if let Some(arr) = &ch.booleans {
3384                    if arr.is_null(in_off) {
3385                        return LiteralValue::Empty;
3386                    }
3387                    LiteralValue::Boolean(arr.value(in_off))
3388                } else {
3389                    LiteralValue::Empty
3390                }
3391            }
3392            TypeTag::Text => {
3393                if let Some(arr) = &ch.text {
3394                    if arr.is_null(in_off) {
3395                        return LiteralValue::Empty;
3396                    }
3397                    let sa = arr
3398                        .as_any()
3399                        .downcast_ref::<arrow_array::StringArray>()
3400                        .unwrap();
3401                    LiteralValue::Text(sa.value(in_off).to_string())
3402                } else {
3403                    LiteralValue::Empty
3404                }
3405            }
3406            TypeTag::Error => {
3407                if let Some(arr) = &ch.errors {
3408                    if arr.is_null(in_off) {
3409                        return LiteralValue::Empty;
3410                    }
3411                    let kind = unmap_error_code(arr.value(in_off));
3412                    LiteralValue::Error(ExcelError::new(kind))
3413                } else {
3414                    LiteralValue::Empty
3415                }
3416            }
3417            TypeTag::Pending => LiteralValue::Pending,
3418        }
3419    }
3420
3421    /// Ensure capacity to address at least `target_rows` rows by extending the row chunk map.
3422    ///
3423    /// This updates `chunk_starts`/`nrows` but does **not** eagerly densify all columns with
3424    /// new empty chunks. Missing chunks are treated as all-empty and can be materialized lazily.
3425    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
3426        if target_rows as u32 <= self.nrows {
3427            return;
3428        }
3429
3430        let chunk_size = self.chunk_rows.max(1);
3431
3432        // `chunk_starts` must represent fixed-size chunk boundaries based on `chunk_rows`, not
3433        // incremental growth steps. In particular, repeated calls like ensure_row_capacity(1),
3434        // ensure_row_capacity(2), ... must NOT create a new chunk per row.
3435        if self.chunk_starts.is_empty() {
3436            self.chunk_starts.push(0);
3437        }
3438
3439        // Extend chunk starts only when `target_rows` crosses a chunk boundary.
3440        // Example: chunk_size=3, target_rows=6 => chunk_starts=[0,3]
3441        let mut next_start = self
3442            .chunk_starts
3443            .last()
3444            .copied()
3445            .unwrap_or(0)
3446            .saturating_add(chunk_size);
3447        while next_start < target_rows {
3448            self.chunk_starts.push(next_start);
3449            next_start = next_start.saturating_add(chunk_size);
3450        }
3451
3452        self.nrows = target_rows as u32;
3453
3454        // Any previously-materialized chunk may have been created when the sheet had fewer rows.
3455        // When `chunk_starts` extends, chunks that used to be "last" can become interior chunks
3456        // with a larger fixed boundary. Ensure materialized chunks are grown to their current
3457        // boundary-derived length so RangeView slicing stays in-bounds.
3458        let starts = self.chunk_starts.clone();
3459        let nrows = self.nrows as usize;
3460        let required_len_for = |ch_idx: usize| -> Option<usize> {
3461            let start = *starts.get(ch_idx)?;
3462            let end = starts.get(ch_idx + 1).copied().unwrap_or(nrows);
3463            Some(end.saturating_sub(start))
3464        };
3465
3466        for col in &mut self.columns {
3467            for (idx, ch) in col.chunks.iter_mut().enumerate() {
3468                if let Some(req) = required_len_for(idx) {
3469                    ch.grow_len_to(req);
3470                }
3471            }
3472            if !col.sparse_chunks.is_empty() {
3473                let keys: Vec<usize> = col.sparse_chunks.keys().copied().collect();
3474                for idx in keys {
3475                    if let (Some(req), Some(ch)) =
3476                        (required_len_for(idx), col.sparse_chunks.get_mut(&idx))
3477                    {
3478                        ch.grow_len_to(req);
3479                    }
3480                }
3481            }
3482        }
3483    }
3484
3485    /// Ensure a mutable chunk for a given column/chunk index.
3486    ///
3487    /// If the chunk is beyond the column's dense chunk vector, it is stored in `sparse_chunks`.
3488    pub fn ensure_column_chunk_mut(
3489        &mut self,
3490        col_idx: usize,
3491        ch_idx: usize,
3492    ) -> Option<&mut ColumnChunk> {
3493        let start = *self.chunk_starts.get(ch_idx)?;
3494        let end = self
3495            .chunk_starts
3496            .get(ch_idx + 1)
3497            .copied()
3498            .unwrap_or(self.nrows as usize);
3499        let len = end.saturating_sub(start);
3500
3501        let col = self.columns.get_mut(col_idx)?;
3502        if ch_idx < col.chunks.len() {
3503            return Some(&mut col.chunks[ch_idx]);
3504        }
3505        Some(
3506            col.sparse_chunks
3507                .entry(ch_idx)
3508                .or_insert_with(|| Self::make_empty_chunk(len)),
3509        )
3510    }
3511
3512    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
3513    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
3514        if abs_row >= self.nrows as usize {
3515            return None;
3516        }
3517        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
3518            Ok(i) => i,
3519            Err(0) => 0,
3520            Err(i) => i - 1,
3521        };
3522        let start = self.chunk_starts[ch_idx];
3523        Some((ch_idx, abs_row - start))
3524    }
3525
3526    fn recompute_chunk_starts(&mut self) {
3527        self.chunk_starts.clear();
3528        if let Some(col0) = self.columns.first() {
3529            let mut cur = 0usize;
3530            for ch in &col0.chunks {
3531                self.chunk_starts.push(cur);
3532                cur += ch.type_tag.len();
3533            }
3534        }
3535    }
3536
3537    fn make_empty_chunk(len: usize) -> ColumnChunk {
3538        ColumnChunk {
3539            numbers: None,
3540            booleans: None,
3541            text: None,
3542            errors: None,
3543            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
3544            formula_id: None,
3545            meta: ColumnChunkMeta {
3546                len,
3547                non_null_num: 0,
3548                non_null_bool: 0,
3549                non_null_text: 0,
3550                non_null_err: 0,
3551            },
3552            lazy_null_numbers: OnceCell::new(),
3553            lazy_null_booleans: OnceCell::new(),
3554            lazy_null_text: OnceCell::new(),
3555            lazy_null_errors: OnceCell::new(),
3556            lowered_text: OnceCell::new(),
3557            overlay: Overlay::new(),
3558            computed_overlay: Overlay::new(),
3559        }
3560    }
3561
3562    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
3563        // Slice type tags
3564        use arrow_array::Array;
3565        let type_tag: Arc<UInt8Array> = Arc::new(
3566            Array::slice(ch.type_tag.as_ref(), off, len)
3567                .as_any()
3568                .downcast_ref::<UInt8Array>()
3569                .unwrap()
3570                .clone(),
3571        );
3572        // Slice numbers if present and keep only if any non-null
3573        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
3574            let sl = Array::slice(a.as_ref(), off, len);
3575            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
3576            let nn = len.saturating_sub(fa.null_count());
3577            if nn == 0 { None } else { Some(Arc::new(fa)) }
3578        });
3579        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
3580            let sl = Array::slice(a.as_ref(), off, len);
3581            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
3582            let nn = len.saturating_sub(ba.null_count());
3583            if nn == 0 { None } else { Some(Arc::new(ba)) }
3584        });
3585        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
3586            let sl = Array::slice(a.as_ref(), off, len);
3587            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
3588            let nn = len.saturating_sub(sa.null_count());
3589            if nn == 0 {
3590                None
3591            } else {
3592                Some(Arc::new(sa) as ArrayRef)
3593            }
3594        });
3595        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
3596            let sl = Array::slice(a.as_ref(), off, len);
3597            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
3598            let nn = len.saturating_sub(ea.null_count());
3599            if nn == 0 { None } else { Some(Arc::new(ea)) }
3600        });
3601        // Split overlays for this slice.
3602        let overlay = ch.overlay.slice(off, len);
3603        let computed_overlay = ch.computed_overlay.slice(off, len);
3604        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3605        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3606        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3607        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3608        ColumnChunk {
3609            numbers: numbers.clone(),
3610            booleans: booleans.clone(),
3611            text: text.clone(),
3612            errors: errors.clone(),
3613            type_tag,
3614            formula_id: None,
3615            meta: ColumnChunkMeta {
3616                len,
3617                non_null_num,
3618                non_null_bool,
3619                non_null_text,
3620                non_null_err,
3621            },
3622            lazy_null_numbers: OnceCell::new(),
3623            lazy_null_booleans: OnceCell::new(),
3624            lazy_null_text: OnceCell::new(),
3625            lazy_null_errors: OnceCell::new(),
3626            lowered_text: OnceCell::new(),
3627            overlay,
3628            computed_overlay,
3629        }
3630    }
3631
3632    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
3633    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
3634    pub fn maybe_compact_chunk(
3635        &mut self,
3636        col_idx: usize,
3637        ch_idx: usize,
3638        abs_threshold: usize,
3639        frac_den: usize,
3640    ) -> usize {
3641        if col_idx >= self.columns.len() {
3642            return 0;
3643        }
3644
3645        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
3646            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
3647                return 0;
3648            };
3649            let len = ch_ref.type_tag.len();
3650            if len == 0 {
3651                return 0;
3652            }
3653
3654            let ov_len = ch_ref.overlay.len();
3655            let den = frac_den.max(1);
3656            let trig = ov_len > (len / den) || ov_len > abs_threshold;
3657            if !trig {
3658                return 0;
3659            }
3660
3661            // Rebuild: merge base lanes with overlays row-by-row.
3662            let mut tag_b = UInt8Builder::with_capacity(len);
3663            let mut nb = Float64Builder::with_capacity(len);
3664            let mut bb = BooleanBuilder::with_capacity(len);
3665            let mut sb = StringBuilder::with_capacity(len, len * 8);
3666            let mut eb = UInt8Builder::with_capacity(len);
3667            let mut non_num = 0usize;
3668            let mut non_bool = 0usize;
3669            let mut non_text = 0usize;
3670            let mut non_err = 0usize;
3671
3672            for i in 0..len {
3673                // If overlay present, use it. Otherwise, use base tag+lane.
3674                if let Some(ov) = ch_ref.overlay.get_scalar(i) {
3675                    let ov = ov.to_overlay_value();
3676                    append_overlay_value_to_lane_builders(
3677                        &ov,
3678                        &mut tag_b,
3679                        &mut nb,
3680                        &mut bb,
3681                        &mut sb,
3682                        &mut eb,
3683                        &mut non_num,
3684                        &mut non_bool,
3685                        &mut non_text,
3686                        &mut non_err,
3687                    );
3688                } else {
3689                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
3690                    match tag {
3691                        TypeTag::Empty => {
3692                            tag_b.append_value(TypeTag::Empty as u8);
3693                            nb.append_null();
3694                            bb.append_null();
3695                            sb.append_null();
3696                            eb.append_null();
3697                        }
3698                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
3699                            tag_b.append_value(tag as u8);
3700                            if let Some(a) = &ch_ref.numbers {
3701                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
3702                                if fa.is_null(i) {
3703                                    nb.append_null();
3704                                } else {
3705                                    nb.append_value(fa.value(i));
3706                                    non_num += 1;
3707                                }
3708                            } else {
3709                                nb.append_null();
3710                            }
3711                            bb.append_null();
3712                            sb.append_null();
3713                            eb.append_null();
3714                        }
3715                        TypeTag::Boolean => {
3716                            tag_b.append_value(TypeTag::Boolean as u8);
3717                            nb.append_null();
3718                            if let Some(a) = &ch_ref.booleans {
3719                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
3720                                if ba.is_null(i) {
3721                                    bb.append_null();
3722                                } else {
3723                                    bb.append_value(ba.value(i));
3724                                    non_bool += 1;
3725                                }
3726                            } else {
3727                                bb.append_null();
3728                            }
3729                            sb.append_null();
3730                            eb.append_null();
3731                        }
3732                        TypeTag::Text => {
3733                            tag_b.append_value(TypeTag::Text as u8);
3734                            nb.append_null();
3735                            bb.append_null();
3736                            if let Some(a) = &ch_ref.text {
3737                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
3738                                if sa.is_null(i) {
3739                                    sb.append_null();
3740                                } else {
3741                                    sb.append_value(sa.value(i));
3742                                    non_text += 1;
3743                                }
3744                            } else {
3745                                sb.append_null();
3746                            }
3747                            eb.append_null();
3748                        }
3749                        TypeTag::Error => {
3750                            tag_b.append_value(TypeTag::Error as u8);
3751                            nb.append_null();
3752                            bb.append_null();
3753                            sb.append_null();
3754                            if let Some(a) = &ch_ref.errors {
3755                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
3756                                if ea.is_null(i) {
3757                                    eb.append_null();
3758                                } else {
3759                                    eb.append_value(ea.value(i));
3760                                    non_err += 1;
3761                                }
3762                            } else {
3763                                eb.append_null();
3764                            }
3765                        }
3766                        TypeTag::Pending => {
3767                            tag_b.append_value(TypeTag::Pending as u8);
3768                            nb.append_null();
3769                            bb.append_null();
3770                            sb.append_null();
3771                            eb.append_null();
3772                        }
3773                    }
3774                }
3775            }
3776
3777            let tags = Arc::new(tag_b.finish());
3778            let numbers = {
3779                let a = nb.finish();
3780                if non_num == 0 {
3781                    None
3782                } else {
3783                    Some(Arc::new(a))
3784                }
3785            };
3786            let booleans = {
3787                let a = bb.finish();
3788                if non_bool == 0 {
3789                    None
3790                } else {
3791                    Some(Arc::new(a))
3792                }
3793            };
3794            let text = {
3795                let a = sb.finish();
3796                if non_text == 0 {
3797                    None
3798                } else {
3799                    Some(Arc::new(a) as ArrayRef)
3800                }
3801            };
3802            let errors = {
3803                let a = eb.finish();
3804                if non_err == 0 {
3805                    None
3806                } else {
3807                    Some(Arc::new(a))
3808                }
3809            };
3810
3811            (
3812                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
3813            )
3814        };
3815
3816        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
3817            return 0;
3818        };
3819
3820        ch_mut.type_tag = tags;
3821        ch_mut.numbers = numbers;
3822        ch_mut.booleans = booleans;
3823        ch_mut.text = text;
3824        ch_mut.errors = errors;
3825        let freed = ch_mut.overlay.clear();
3826        ch_mut.lowered_text = OnceCell::new();
3827        ch_mut.meta.len = len;
3828        ch_mut.meta.non_null_num = non_num;
3829        ch_mut.meta.non_null_bool = non_bool;
3830        ch_mut.meta.non_null_text = non_text;
3831        ch_mut.meta.non_null_err = non_err;
3832        freed
3833    }
3834
3835    /// Compact a dense chunk's computed overlay into its base arrays, freeing overlay memory
3836    /// while preserving the data. Returns the number of bytes freed.
3837    ///
3838    /// This is the computed-overlay counterpart of `maybe_compact_chunk` (which compacts
3839    /// user-edit overlays). The read cascade is `overlay → computed_overlay → base`, so
3840    /// folding computed overlay entries into base arrays is transparent: the `overlay` layer
3841    /// (user edits) is left untouched and still takes precedence on reads.
3842    pub fn compact_computed_overlay_chunk(&mut self, col_idx: usize, ch_idx: usize) -> usize {
3843        if col_idx >= self.columns.len() {
3844            return 0;
3845        }
3846
3847        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
3848            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
3849                return 0;
3850            };
3851            let len = ch_ref.type_tag.len();
3852            if len == 0 || ch_ref.computed_overlay.is_empty() {
3853                return 0;
3854            }
3855
3856            let mut tag_b = UInt8Builder::with_capacity(len);
3857            let mut nb = Float64Builder::with_capacity(len);
3858            let mut bb = BooleanBuilder::with_capacity(len);
3859            let mut sb = StringBuilder::with_capacity(len, len * 8);
3860            let mut eb = UInt8Builder::with_capacity(len);
3861            let mut non_num = 0usize;
3862            let mut non_bool = 0usize;
3863            let mut non_text = 0usize;
3864            let mut non_err = 0usize;
3865
3866            for i in 0..len {
3867                if let Some(ov) = ch_ref.computed_overlay.get_scalar(i) {
3868                    let ov = ov.to_overlay_value();
3869                    append_overlay_value_to_lane_builders(
3870                        &ov,
3871                        &mut tag_b,
3872                        &mut nb,
3873                        &mut bb,
3874                        &mut sb,
3875                        &mut eb,
3876                        &mut non_num,
3877                        &mut non_bool,
3878                        &mut non_text,
3879                        &mut non_err,
3880                    );
3881                } else {
3882                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
3883                    match tag {
3884                        TypeTag::Empty => {
3885                            tag_b.append_value(TypeTag::Empty as u8);
3886                            nb.append_null();
3887                            bb.append_null();
3888                            sb.append_null();
3889                            eb.append_null();
3890                        }
3891                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
3892                            tag_b.append_value(tag as u8);
3893                            if let Some(a) = &ch_ref.numbers {
3894                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
3895                                if fa.is_null(i) {
3896                                    nb.append_null();
3897                                } else {
3898                                    nb.append_value(fa.value(i));
3899                                    non_num += 1;
3900                                }
3901                            } else {
3902                                nb.append_null();
3903                            }
3904                            bb.append_null();
3905                            sb.append_null();
3906                            eb.append_null();
3907                        }
3908                        TypeTag::Boolean => {
3909                            tag_b.append_value(TypeTag::Boolean as u8);
3910                            nb.append_null();
3911                            if let Some(a) = &ch_ref.booleans {
3912                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
3913                                if ba.is_null(i) {
3914                                    bb.append_null();
3915                                } else {
3916                                    bb.append_value(ba.value(i));
3917                                    non_bool += 1;
3918                                }
3919                            } else {
3920                                bb.append_null();
3921                            }
3922                            sb.append_null();
3923                            eb.append_null();
3924                        }
3925                        TypeTag::Text => {
3926                            tag_b.append_value(TypeTag::Text as u8);
3927                            nb.append_null();
3928                            bb.append_null();
3929                            if let Some(a) = &ch_ref.text {
3930                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
3931                                if sa.is_null(i) {
3932                                    sb.append_null();
3933                                } else {
3934                                    sb.append_value(sa.value(i));
3935                                    non_text += 1;
3936                                }
3937                            } else {
3938                                sb.append_null();
3939                            }
3940                            eb.append_null();
3941                        }
3942                        TypeTag::Error => {
3943                            tag_b.append_value(TypeTag::Error as u8);
3944                            nb.append_null();
3945                            bb.append_null();
3946                            sb.append_null();
3947                            if let Some(a) = &ch_ref.errors {
3948                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
3949                                if ea.is_null(i) {
3950                                    eb.append_null();
3951                                } else {
3952                                    eb.append_value(ea.value(i));
3953                                    non_err += 1;
3954                                }
3955                            } else {
3956                                eb.append_null();
3957                            }
3958                        }
3959                        TypeTag::Pending => {
3960                            tag_b.append_value(TypeTag::Pending as u8);
3961                            nb.append_null();
3962                            bb.append_null();
3963                            sb.append_null();
3964                            eb.append_null();
3965                        }
3966                    }
3967                }
3968            }
3969
3970            let tags = Arc::new(tag_b.finish());
3971            let numbers = {
3972                let a = nb.finish();
3973                if non_num == 0 {
3974                    None
3975                } else {
3976                    Some(Arc::new(a))
3977                }
3978            };
3979            let booleans = {
3980                let a = bb.finish();
3981                if non_bool == 0 {
3982                    None
3983                } else {
3984                    Some(Arc::new(a))
3985                }
3986            };
3987            let text = {
3988                let a = sb.finish();
3989                if non_text == 0 {
3990                    None
3991                } else {
3992                    Some(Arc::new(a) as ArrayRef)
3993                }
3994            };
3995            let errors = {
3996                let a = eb.finish();
3997                if non_err == 0 {
3998                    None
3999                } else {
4000                    Some(Arc::new(a))
4001                }
4002            };
4003
4004            (
4005                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
4006            )
4007        };
4008
4009        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
4010            return 0;
4011        };
4012
4013        ch_mut.type_tag = tags;
4014        ch_mut.numbers = numbers;
4015        ch_mut.booleans = booleans;
4016        ch_mut.text = text;
4017        ch_mut.errors = errors;
4018        let freed = ch_mut.computed_overlay.clear();
4019        ch_mut.lowered_text = OnceCell::new();
4020        ch_mut.meta.len = len;
4021        ch_mut.meta.non_null_num = non_num;
4022        ch_mut.meta.non_null_bool = non_bool;
4023        ch_mut.meta.non_null_text = non_text;
4024        ch_mut.meta.non_null_err = non_err;
4025        freed
4026    }
4027
4028    /// Compact a sparse chunk's computed overlay into its base arrays.
4029    /// Equivalent to `compact_computed_overlay_chunk` but for sparse chunks.
4030    pub fn compact_computed_overlay_sparse_chunk(
4031        &mut self,
4032        col_idx: usize,
4033        ch_idx: usize,
4034    ) -> usize {
4035        // Sparse chunks are accessed via the same chunk/chunk_mut API,
4036        // so we delegate to the dense method which already handles both.
4037        self.compact_computed_overlay_chunk(col_idx, ch_idx)
4038    }
4039
4040    /// Insert `count` rows before absolute 0-based row `before`.
4041    pub fn insert_rows(&mut self, before: usize, count: usize) {
4042        if count == 0 {
4043            return;
4044        }
4045
4046        let total_rows = self.nrows as usize;
4047        if total_rows == 0 {
4048            self.nrows = count as u32;
4049            if self.nrows > 0 && self.chunk_starts.is_empty() {
4050                self.chunk_starts.push(0);
4051            }
4052            return;
4053        }
4054
4055        // Ensure a valid chunk map for non-empty sheets.
4056        if self.chunk_starts.is_empty() {
4057            self.chunk_starts.push(0);
4058        }
4059
4060        // "Dense" mode: every column has every chunk (legacy invariant).
4061        let dense_aligned = self
4062            .columns
4063            .iter()
4064            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4065
4066        let insert_at = before.min(total_rows);
4067        let (split_idx, split_off) = if insert_at == total_rows {
4068            // Append at end: split after last chunk.
4069            let last_idx = self.chunk_starts.len() - 1;
4070            let last_start = self.chunk_starts[last_idx];
4071            let last_len = total_rows.saturating_sub(last_start);
4072            (last_idx, last_len)
4073        } else {
4074            self.chunk_of_row(insert_at).unwrap_or((0, 0))
4075        };
4076
4077        if dense_aligned {
4078            // Rebuild chunks for each column (including inserted empty chunk) and recompute starts.
4079            for col in &mut self.columns {
4080                let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 2);
4081                for i in 0..col.chunks.len() {
4082                    if i != split_idx {
4083                        new_chunks.push(col.chunks[i].clone());
4084                    } else {
4085                        let orig = &col.chunks[i];
4086                        let len = orig.type_tag.len();
4087                        if split_off > 0 {
4088                            new_chunks.push(Self::slice_chunk(orig, 0, split_off));
4089                        }
4090                        new_chunks.push(Self::make_empty_chunk(count));
4091                        if split_off < len {
4092                            new_chunks.push(Self::slice_chunk(orig, split_off, len - split_off));
4093                        }
4094                    }
4095                }
4096                col.chunks = new_chunks;
4097                col.sparse_chunks.clear();
4098            }
4099            self.nrows = (total_rows + count) as u32;
4100            self.recompute_chunk_starts();
4101            return;
4102        }
4103
4104        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
4105        #[derive(Clone, Copy)]
4106        enum PlanItem {
4107            Slice {
4108                old_idx: usize,
4109                off: usize,
4110                len: usize,
4111            },
4112            Empty {
4113                len: usize,
4114            },
4115        }
4116
4117        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len() + 2);
4118        for old_idx in 0..self.chunk_starts.len() {
4119            let ch_start = self.chunk_starts[old_idx];
4120            let ch_end = self
4121                .chunk_starts
4122                .get(old_idx + 1)
4123                .copied()
4124                .unwrap_or(total_rows);
4125            let ch_len = ch_end.saturating_sub(ch_start);
4126            if ch_len == 0 {
4127                continue;
4128            }
4129
4130            if old_idx != split_idx {
4131                plan.push(PlanItem::Slice {
4132                    old_idx,
4133                    off: 0,
4134                    len: ch_len,
4135                });
4136                continue;
4137            }
4138
4139            let left_len = split_off.min(ch_len);
4140            let right_len = ch_len.saturating_sub(left_len);
4141            if left_len > 0 {
4142                plan.push(PlanItem::Slice {
4143                    old_idx,
4144                    off: 0,
4145                    len: left_len,
4146                });
4147            }
4148            plan.push(PlanItem::Empty { len: count });
4149            if right_len > 0 {
4150                plan.push(PlanItem::Slice {
4151                    old_idx,
4152                    off: left_len,
4153                    len: right_len,
4154                });
4155            }
4156        }
4157
4158        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
4159        let mut cur = 0usize;
4160        for item in &plan {
4161            let len = match *item {
4162                PlanItem::Slice { len, .. } => len,
4163                PlanItem::Empty { len } => len,
4164            };
4165            if len == 0 {
4166                continue;
4167            }
4168            new_starts.push(cur);
4169            cur = cur.saturating_add(len);
4170        }
4171
4172        debug_assert_eq!(cur, total_rows.saturating_add(count));
4173
4174        // Update sheet row layout first.
4175        self.nrows = (total_rows + count) as u32;
4176        self.chunk_starts = new_starts;
4177
4178        // Rebuild stored chunks per column using the plan.
4179        for col in &mut self.columns {
4180            let old_dense = std::mem::take(&mut col.chunks);
4181            let old_sparse = std::mem::take(&mut col.sparse_chunks);
4182            let get_old = |idx: usize| -> Option<&ColumnChunk> {
4183                if idx < old_dense.len() {
4184                    Some(&old_dense[idx])
4185                } else {
4186                    old_sparse.get(&idx)
4187                }
4188            };
4189
4190            let mut dense: Vec<ColumnChunk> = Vec::new();
4191            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
4192            let mut dense_prefix = true;
4193
4194            for (new_idx, item) in plan.iter().enumerate() {
4195                let produced: Option<ColumnChunk> = match *item {
4196                    PlanItem::Empty { .. } => None,
4197                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
4198                        Some(orig) => {
4199                            if off == 0 && len == orig.type_tag.len() {
4200                                Some(orig.clone())
4201                            } else {
4202                                Some(Self::slice_chunk(orig, off, len))
4203                            }
4204                        }
4205                        None => None,
4206                    },
4207                };
4208
4209                if let Some(ch) = produced {
4210                    if dense_prefix && new_idx == dense.len() {
4211                        dense.push(ch);
4212                    } else {
4213                        sparse.insert(new_idx, ch);
4214                        dense_prefix = false;
4215                    }
4216                } else if dense_prefix && new_idx == dense.len() {
4217                    dense_prefix = false;
4218                }
4219            }
4220
4221            col.chunks = dense;
4222            col.sparse_chunks = sparse;
4223        }
4224    }
4225
4226    /// Delete `count` rows starting from absolute 0-based row `start`.
4227    pub fn delete_rows(&mut self, start: usize, count: usize) {
4228        if count == 0 || self.nrows == 0 {
4229            return;
4230        }
4231
4232        let total_rows = self.nrows as usize;
4233        if start >= total_rows {
4234            return;
4235        }
4236        let end = (start + count).min(total_rows);
4237        let del_len = end.saturating_sub(start);
4238        if del_len == 0 {
4239            return;
4240        }
4241
4242        // Ensure a valid chunk map for non-empty sheets.
4243        if total_rows > 0 && self.chunk_starts.is_empty() {
4244            self.chunk_starts.push(0);
4245        }
4246
4247        // "Dense" mode: every column has every chunk (legacy invariant).
4248        let dense_aligned = self
4249            .columns
4250            .iter()
4251            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4252
4253        if dense_aligned {
4254            // Dense rebuild by slicing out the deleted window.
4255            for col in &mut self.columns {
4256                let mut new_chunks: Vec<ColumnChunk> = Vec::new();
4257                let mut cur_start = 0usize;
4258                for ch in &col.chunks {
4259                    let len = ch.type_tag.len();
4260                    let ch_end = cur_start + len;
4261                    // No overlap
4262                    if ch_end <= start || cur_start >= end {
4263                        new_chunks.push(ch.clone());
4264                    } else {
4265                        // Overlap exists
4266                        let del_start = start.max(cur_start);
4267                        let del_end = end.min(ch_end);
4268                        let left_len = del_start.saturating_sub(cur_start);
4269                        let right_len = ch_end.saturating_sub(del_end);
4270                        if left_len > 0 {
4271                            new_chunks.push(Self::slice_chunk(ch, 0, left_len));
4272                        }
4273                        if right_len > 0 {
4274                            let off = len - right_len;
4275                            new_chunks.push(Self::slice_chunk(ch, off, right_len));
4276                        }
4277                    }
4278                    cur_start = ch_end;
4279                }
4280                col.chunks = new_chunks;
4281                col.sparse_chunks.clear();
4282            }
4283            self.nrows = (total_rows - del_len) as u32;
4284            self.recompute_chunk_starts();
4285            return;
4286        }
4287
4288        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
4289        #[derive(Clone, Copy)]
4290        enum PlanItem {
4291            Slice {
4292                old_idx: usize,
4293                off: usize,
4294                len: usize,
4295            },
4296        }
4297
4298        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len());
4299        for old_idx in 0..self.chunk_starts.len() {
4300            let ch_start = self.chunk_starts[old_idx];
4301            let ch_end = self
4302                .chunk_starts
4303                .get(old_idx + 1)
4304                .copied()
4305                .unwrap_or(total_rows);
4306            let ch_len = ch_end.saturating_sub(ch_start);
4307            if ch_len == 0 {
4308                continue;
4309            }
4310
4311            // No overlap
4312            if ch_end <= start || ch_start >= end {
4313                plan.push(PlanItem::Slice {
4314                    old_idx,
4315                    off: 0,
4316                    len: ch_len,
4317                });
4318                continue;
4319            }
4320
4321            // Left remainder
4322            if start > ch_start {
4323                let left_end = start.min(ch_end);
4324                let left_len = left_end.saturating_sub(ch_start);
4325                if left_len > 0 {
4326                    plan.push(PlanItem::Slice {
4327                        old_idx,
4328                        off: 0,
4329                        len: left_len,
4330                    });
4331                }
4332            }
4333
4334            // Right remainder
4335            if end < ch_end {
4336                let right_off = end.saturating_sub(ch_start);
4337                let right_len = ch_end.saturating_sub(end);
4338                if right_len > 0 {
4339                    plan.push(PlanItem::Slice {
4340                        old_idx,
4341                        off: right_off,
4342                        len: right_len,
4343                    });
4344                }
4345            }
4346        }
4347
4348        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
4349        let mut cur = 0usize;
4350        for item in &plan {
4351            let len = match *item {
4352                PlanItem::Slice { len, .. } => len,
4353            };
4354            if len == 0 {
4355                continue;
4356            }
4357            new_starts.push(cur);
4358            cur = cur.saturating_add(len);
4359        }
4360
4361        debug_assert_eq!(cur, total_rows.saturating_sub(del_len));
4362
4363        // Update sheet row layout first.
4364        self.nrows = (total_rows - del_len) as u32;
4365        self.chunk_starts = new_starts;
4366
4367        // Rebuild stored chunks per column using the plan.
4368        for col in &mut self.columns {
4369            let old_dense = std::mem::take(&mut col.chunks);
4370            let old_sparse = std::mem::take(&mut col.sparse_chunks);
4371            let get_old = |idx: usize| -> Option<&ColumnChunk> {
4372                if idx < old_dense.len() {
4373                    Some(&old_dense[idx])
4374                } else {
4375                    old_sparse.get(&idx)
4376                }
4377            };
4378
4379            let mut dense: Vec<ColumnChunk> = Vec::new();
4380            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
4381            let mut dense_prefix = true;
4382
4383            for (new_idx, item) in plan.iter().enumerate() {
4384                let produced: Option<ColumnChunk> = match *item {
4385                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
4386                        Some(orig) => {
4387                            if off == 0 && len == orig.type_tag.len() {
4388                                Some(orig.clone())
4389                            } else {
4390                                Some(Self::slice_chunk(orig, off, len))
4391                            }
4392                        }
4393                        None => None,
4394                    },
4395                };
4396
4397                if let Some(ch) = produced {
4398                    if dense_prefix && new_idx == dense.len() {
4399                        dense.push(ch);
4400                    } else {
4401                        sparse.insert(new_idx, ch);
4402                        dense_prefix = false;
4403                    }
4404                } else if dense_prefix && new_idx == dense.len() {
4405                    dense_prefix = false;
4406                }
4407            }
4408
4409            col.chunks = dense;
4410            col.sparse_chunks = sparse;
4411        }
4412    }
4413
4414    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
4415    pub fn insert_columns(&mut self, before: usize, count: usize) {
4416        if count == 0 {
4417            return;
4418        }
4419        // Determine chunk schema from first column if present
4420        let empty_col = |lens: &[usize]| -> ArrowColumn {
4421            let mut chunks = Vec::with_capacity(lens.len());
4422            for &l in lens {
4423                chunks.push(Self::make_empty_chunk(l));
4424            }
4425            ArrowColumn {
4426                chunks,
4427                sparse_chunks: FxHashMap::default(),
4428                index: 0,
4429            }
4430        };
4431        let dense_aligned = !self.columns.is_empty()
4432            && self
4433                .columns
4434                .iter()
4435                .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4436
4437        let lens: Vec<usize> = if dense_aligned {
4438            self.columns[0]
4439                .chunks
4440                .iter()
4441                .map(|c| c.type_tag.len())
4442                .collect()
4443        } else if self.columns.is_empty() {
4444            // No columns: single chunk matching nrows if any
4445            if self.nrows > 0 {
4446                vec![self.nrows as usize]
4447            } else {
4448                Vec::new()
4449            }
4450        } else {
4451            // Sparse sheet: keep inserted columns cheap by materializing no chunks.
4452            Vec::new()
4453        };
4454        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
4455        let before_idx = before.min(self.columns.len());
4456        for (i, col) in self.columns.iter_mut().enumerate() {
4457            if i == before_idx {
4458                for _ in 0..count {
4459                    cols_new.push(empty_col(&lens));
4460                }
4461            }
4462            cols_new.push(col.clone());
4463        }
4464        if before_idx == self.columns.len() {
4465            for _ in 0..count {
4466                cols_new.push(empty_col(&lens));
4467            }
4468        }
4469        // Fix column indices
4470        for (idx, col) in cols_new.iter_mut().enumerate() {
4471            col.index = idx as u32;
4472        }
4473        self.columns = cols_new;
4474        // chunk_starts unchanged; lens were matched
4475    }
4476
4477    /// Delete `count` columns starting at absolute 0-based column `start`.
4478    pub fn delete_columns(&mut self, start: usize, count: usize) {
4479        if count == 0 || self.columns.is_empty() {
4480            return;
4481        }
4482        let end = (start + count).min(self.columns.len());
4483        if start >= end {
4484            return;
4485        }
4486        self.columns.drain(start..end);
4487        for (idx, col) in self.columns.iter_mut().enumerate() {
4488            col.index = idx as u32;
4489        }
4490    }
4491}
4492
4493#[derive(Debug, Clone, Copy)]
4494pub struct ColumnShape {
4495    pub index: u32,
4496    pub chunks: usize,
4497    pub rows: usize,
4498    pub has_num: bool,
4499    pub has_bool: bool,
4500    pub has_text: bool,
4501    pub has_err: bool,
4502}
4503
4504#[cfg(test)]
4505mod tests {
4506    use super::*;
4507    use arrow_array::Array;
4508    use arrow_schema::DataType;
4509    use chrono::Datelike;
4510
4511    fn add_overlay_stats(into: &mut OverlayDebugStats, next: OverlayDebugStats) {
4512        into.points += next.points;
4513        into.sparse_fragments += next.sparse_fragments;
4514        into.dense_fragments += next.dense_fragments;
4515        into.run_fragments += next.run_fragments;
4516        into.covered_len += next.covered_len;
4517    }
4518
4519    fn column_overlay_stats(
4520        sheet: &ArrowSheet,
4521        col_idx: usize,
4522        computed: bool,
4523    ) -> OverlayDebugStats {
4524        let mut stats = OverlayDebugStats::default();
4525        let Some(column) = sheet.columns.get(col_idx) else {
4526            return stats;
4527        };
4528        for chunk in &column.chunks {
4529            add_overlay_stats(
4530                &mut stats,
4531                if computed {
4532                    chunk.computed_overlay.debug_stats()
4533                } else {
4534                    chunk.overlay.debug_stats()
4535                },
4536            );
4537        }
4538        for chunk in column.sparse_chunks.values() {
4539            add_overlay_stats(
4540                &mut stats,
4541                if computed {
4542                    chunk.computed_overlay.debug_stats()
4543                } else {
4544                    chunk.overlay.debug_stats()
4545                },
4546            );
4547        }
4548        stats
4549    }
4550
4551    fn assert_column_overlays_normalized(sheet: &ArrowSheet, col_idx: usize) {
4552        let column = &sheet.columns[col_idx];
4553        for chunk in &column.chunks {
4554            assert!(chunk.overlay.debug_is_normalized());
4555            assert!(chunk.computed_overlay.debug_is_normalized());
4556            assert_eq!(
4557                chunk.overlay.estimated_bytes(),
4558                chunk.overlay.debug_recomputed_estimated_bytes()
4559            );
4560            assert_eq!(
4561                chunk.computed_overlay.estimated_bytes(),
4562                chunk.computed_overlay.debug_recomputed_estimated_bytes()
4563            );
4564        }
4565        for chunk in column.sparse_chunks.values() {
4566            assert!(chunk.overlay.debug_is_normalized());
4567            assert!(chunk.computed_overlay.debug_is_normalized());
4568            assert_eq!(
4569                chunk.overlay.estimated_bytes(),
4570                chunk.overlay.debug_recomputed_estimated_bytes()
4571            );
4572            assert_eq!(
4573                chunk.computed_overlay.estimated_bytes(),
4574                chunk.computed_overlay.debug_recomputed_estimated_bytes()
4575            );
4576        }
4577    }
4578
4579    fn column_computed_overlay_estimated_bytes(sheet: &ArrowSheet, col_idx: usize) -> usize {
4580        let Some(column) = sheet.columns.get(col_idx) else {
4581            return 0;
4582        };
4583        column
4584            .chunks
4585            .iter()
4586            .map(|chunk| chunk.computed_overlay.estimated_bytes())
4587            .chain(
4588                column
4589                    .sparse_chunks
4590                    .values()
4591                    .map(|chunk| chunk.computed_overlay.estimated_bytes()),
4592            )
4593            .fold(0usize, usize::saturating_add)
4594    }
4595
4596    #[derive(Debug, Clone, Copy)]
4597    enum Phase4ProbeFixture {
4598        PointNumeric,
4599        DenseNumeric,
4600        RunNumeric,
4601        SparseNumeric,
4602        EmptyRun,
4603        MixedDense,
4604    }
4605
4606    impl Phase4ProbeFixture {
4607        fn name(self) -> &'static str {
4608            match self {
4609                Phase4ProbeFixture::PointNumeric => "point_numeric",
4610                Phase4ProbeFixture::DenseNumeric => "dense_numeric",
4611                Phase4ProbeFixture::RunNumeric => "run_numeric",
4612                Phase4ProbeFixture::SparseNumeric => "sparse_numeric",
4613                Phase4ProbeFixture::EmptyRun => "empty_run",
4614                Phase4ProbeFixture::MixedDense => "mixed_dense",
4615            }
4616        }
4617    }
4618
4619    #[derive(Debug, serde::Serialize)]
4620    struct Phase4ProbeOp {
4621        ms: f64,
4622        segments: usize,
4623        arrays: usize,
4624        rows_scanned: usize,
4625        checksum: f64,
4626        non_null: usize,
4627    }
4628
4629    #[derive(Debug, serde::Serialize)]
4630    struct Phase4ProbeRow {
4631        fixture: &'static str,
4632        rows: usize,
4633        points: usize,
4634        sparse_fragments: usize,
4635        dense_fragments: usize,
4636        run_fragments: usize,
4637        covered_len: usize,
4638        overlay_estimated_bytes: usize,
4639        numbers: Phase4ProbeOp,
4640        type_tags: Phase4ProbeOp,
4641        lowered_text: Phase4ProbeOp,
4642        get_cell_scan: Phase4ProbeOp,
4643        select_stats: OverlaySelectStats,
4644    }
4645
4646    fn build_phase4_probe_sheet(rows: usize, fixture: Phase4ProbeFixture) -> ArrowSheet {
4647        let mut builder =
4648            IngestBuilder::new("S", 1, rows.max(1), crate::engine::DateSystem::Excel1900);
4649        for row in 0..rows {
4650            builder
4651                .append_row(&[LiteralValue::Number((row + 1) as f64)])
4652                .unwrap();
4653        }
4654        let mut sheet = builder.finish();
4655        let chunk = sheet.columns[0].chunk_mut(0).unwrap();
4656        match fixture {
4657            Phase4ProbeFixture::PointNumeric => {
4658                for row in 0..rows {
4659                    chunk
4660                        .computed_overlay
4661                        .set_scalar(row, OverlayValue::Number((row + 1) as f64));
4662                }
4663            }
4664            Phase4ProbeFixture::DenseNumeric => {
4665                chunk.computed_overlay.apply_fragment(
4666                    OverlayFragment::dense_range(
4667                        0,
4668                        (0..rows)
4669                            .map(|row| OverlayValue::Number((row + 1) as f64))
4670                            .collect(),
4671                    )
4672                    .unwrap(),
4673                );
4674            }
4675            Phase4ProbeFixture::RunNumeric => {
4676                chunk.computed_overlay.apply_fragment(
4677                    OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); rows]).unwrap(),
4678                );
4679            }
4680            Phase4ProbeFixture::SparseNumeric => {
4681                chunk.computed_overlay.apply_fragment(
4682                    OverlayFragment::sparse_offsets(
4683                        (0..rows)
4684                            .step_by(10)
4685                            .map(|row| (row, OverlayValue::Number(10.0)))
4686                            .collect(),
4687                    )
4688                    .unwrap(),
4689                );
4690            }
4691            Phase4ProbeFixture::EmptyRun => {
4692                chunk.computed_overlay.apply_fragment(
4693                    OverlayFragment::run_range(0, vec![OverlayValue::Empty; rows]).unwrap(),
4694                );
4695            }
4696            Phase4ProbeFixture::MixedDense => {
4697                let pattern = [
4698                    OverlayValue::Number(1.0),
4699                    OverlayValue::Boolean(true),
4700                    OverlayValue::Text(Arc::from("Alpha")),
4701                    OverlayValue::Empty,
4702                    OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
4703                    OverlayValue::Pending,
4704                    OverlayValue::DateTime(45000.25),
4705                    OverlayValue::Duration(0.5),
4706                ];
4707                chunk.computed_overlay.apply_fragment(
4708                    OverlayFragment::dense_range(
4709                        0,
4710                        (0..rows)
4711                            .map(|row| pattern[row % pattern.len()].clone())
4712                            .collect(),
4713                    )
4714                    .unwrap(),
4715                );
4716            }
4717        }
4718        sheet
4719    }
4720
4721    fn measure_probe_numbers(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4722        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4723        let start = std::time::Instant::now();
4724        let mut segments = 0usize;
4725        let mut arrays = 0usize;
4726        let mut rows_scanned = 0usize;
4727        let mut checksum = 0.0;
4728        let mut non_null = 0usize;
4729        for segment in view.numbers_slices() {
4730            let (_row_start, row_len, cols) = segment.unwrap();
4731            segments += 1;
4732            rows_scanned += row_len;
4733            for array in cols {
4734                arrays += 1;
4735                for idx in 0..array.len() {
4736                    if array.is_valid(idx) {
4737                        checksum += array.value(idx);
4738                        non_null += 1;
4739                    }
4740                }
4741            }
4742        }
4743        Phase4ProbeOp {
4744            ms: start.elapsed().as_secs_f64() * 1000.0,
4745            segments,
4746            arrays,
4747            rows_scanned,
4748            checksum,
4749            non_null,
4750        }
4751    }
4752
4753    fn measure_probe_type_tags(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4754        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4755        let start = std::time::Instant::now();
4756        let mut segments = 0usize;
4757        let mut arrays = 0usize;
4758        let mut rows_scanned = 0usize;
4759        let mut checksum = 0.0;
4760        let mut non_null = 0usize;
4761        for segment in view.type_tags_slices() {
4762            let (_row_start, row_len, cols) = segment.unwrap();
4763            segments += 1;
4764            rows_scanned += row_len;
4765            for array in cols {
4766                arrays += 1;
4767                for idx in 0..array.len() {
4768                    if array.is_valid(idx) {
4769                        checksum += array.value(idx) as f64;
4770                        non_null += 1;
4771                    }
4772                }
4773            }
4774        }
4775        Phase4ProbeOp {
4776            ms: start.elapsed().as_secs_f64() * 1000.0,
4777            segments,
4778            arrays,
4779            rows_scanned,
4780            checksum,
4781            non_null,
4782        }
4783    }
4784
4785    fn measure_probe_lowered_text(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4786        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4787        let start = std::time::Instant::now();
4788        let mut segments = 0usize;
4789        let mut arrays = 0usize;
4790        let mut rows_scanned = 0usize;
4791        let mut checksum = 0.0;
4792        let mut non_null = 0usize;
4793        for segment in view.lowered_text_slices() {
4794            let (_row_start, row_len, cols) = segment.unwrap();
4795            segments += 1;
4796            rows_scanned += row_len;
4797            for array in cols {
4798                arrays += 1;
4799                for idx in 0..array.len() {
4800                    if array.is_valid(idx) {
4801                        checksum += array.value(idx).len() as f64;
4802                        non_null += 1;
4803                    }
4804                }
4805            }
4806        }
4807        Phase4ProbeOp {
4808            ms: start.elapsed().as_secs_f64() * 1000.0,
4809            segments,
4810            arrays,
4811            rows_scanned,
4812            checksum,
4813            non_null,
4814        }
4815    }
4816
4817    fn literal_probe_weight(value: LiteralValue) -> f64 {
4818        match value {
4819            LiteralValue::Empty => 0.0,
4820            LiteralValue::Int(value) => value as f64,
4821            LiteralValue::Number(value) => value,
4822            LiteralValue::Boolean(value) => {
4823                if value {
4824                    1.0
4825                } else {
4826                    0.0
4827                }
4828            }
4829            LiteralValue::Text(value) => value.len() as f64,
4830            LiteralValue::Error(_) => -1.0,
4831            LiteralValue::Date(value) => value.num_days_from_ce() as f64,
4832            LiteralValue::DateTime(value) => value.and_utc().timestamp() as f64,
4833            LiteralValue::Time(value) => value.num_seconds_from_midnight() as f64,
4834            LiteralValue::Duration(value) => value.num_seconds() as f64,
4835            LiteralValue::Array(values) => values.len() as f64,
4836            LiteralValue::Pending => -2.0,
4837        }
4838    }
4839
4840    fn measure_probe_get_cell(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4841        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4842        let start = std::time::Instant::now();
4843        let mut checksum = 0.0;
4844        for row in 0..rows {
4845            checksum += literal_probe_weight(view.get_cell(row, 0));
4846        }
4847        Phase4ProbeOp {
4848            ms: start.elapsed().as_secs_f64() * 1000.0,
4849            segments: 1,
4850            arrays: 0,
4851            rows_scanned: rows,
4852            checksum,
4853            non_null: rows,
4854        }
4855    }
4856
4857    fn run_phase4_probe_fixture(rows: usize, fixture: Phase4ProbeFixture) -> Phase4ProbeRow {
4858        let sheet = build_phase4_probe_sheet(rows, fixture);
4859        assert_column_overlays_normalized(&sheet, 0);
4860        let stats = column_overlay_stats(&sheet, 0, true);
4861        reset_overlay_select_stats();
4862        let numbers = measure_probe_numbers(&sheet, rows);
4863        let type_tags = measure_probe_type_tags(&sheet, rows);
4864        let lowered_text = measure_probe_lowered_text(&sheet, rows);
4865        let select_stats = snapshot_overlay_select_stats();
4866        let get_cell_scan = measure_probe_get_cell(&sheet, rows);
4867        Phase4ProbeRow {
4868            fixture: fixture.name(),
4869            rows,
4870            points: stats.points,
4871            sparse_fragments: stats.sparse_fragments,
4872            dense_fragments: stats.dense_fragments,
4873            run_fragments: stats.run_fragments,
4874            covered_len: stats.covered_len,
4875            overlay_estimated_bytes: column_computed_overlay_estimated_bytes(&sheet, 0),
4876            numbers,
4877            type_tags,
4878            lowered_text,
4879            get_cell_scan,
4880            select_stats,
4881        }
4882    }
4883
4884    #[test]
4885    #[ignore = "manual Phase 4 observability probe; run with --ignored --nocapture"]
4886    fn phase4_overlay_rangeview_observability_probe() {
4887        let rows = std::env::var("FORMUALIZER_OVERLAY_PROBE_ROWS")
4888            .ok()
4889            .and_then(|value| value.parse::<usize>().ok())
4890            .unwrap_or(100_000)
4891            .max(1);
4892        for fixture in [
4893            Phase4ProbeFixture::PointNumeric,
4894            Phase4ProbeFixture::DenseNumeric,
4895            Phase4ProbeFixture::RunNumeric,
4896            Phase4ProbeFixture::SparseNumeric,
4897            Phase4ProbeFixture::EmptyRun,
4898            Phase4ProbeFixture::MixedDense,
4899        ] {
4900            let row = run_phase4_probe_fixture(rows, fixture);
4901            println!("{}", serde_json::to_string(&row).unwrap());
4902        }
4903    }
4904
4905    #[test]
4906    fn ingest_mixed_rows_into_lanes_and_tags() {
4907        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
4908        let data = vec![
4909            LiteralValue::Number(42.5),                   // Number
4910            LiteralValue::Empty,                          // Empty
4911            LiteralValue::Text(String::new()),            // Empty text (Text lane)
4912            LiteralValue::Boolean(true),                  // Boolean
4913            LiteralValue::Error(ExcelError::new_value()), // Error
4914        ];
4915        for v in &data {
4916            b.append_row(std::slice::from_ref(v)).unwrap();
4917        }
4918        let sheet = b.finish();
4919        assert_eq!(sheet.nrows, 5);
4920        assert_eq!(sheet.columns.len(), 1);
4921        assert_eq!(sheet.columns[0].chunks.len(), 1);
4922        let ch = &sheet.columns[0].chunks[0];
4923
4924        // Type tags
4925        let tags = ch.type_tag.values();
4926        assert_eq!(tags.len(), 5);
4927        assert_eq!(tags[0], TypeTag::Number as u8);
4928        assert_eq!(tags[1], TypeTag::Empty as u8);
4929        assert_eq!(tags[2], TypeTag::Text as u8);
4930        assert_eq!(tags[3], TypeTag::Boolean as u8);
4931        assert_eq!(tags[4], TypeTag::Error as u8);
4932
4933        // Numbers lane validity
4934        let nums = ch.numbers.as_ref().unwrap();
4935        assert_eq!(nums.len(), 5);
4936        assert_eq!(nums.null_count(), 4);
4937        assert!(nums.is_valid(0));
4938
4939        // Booleans lane validity
4940        let bools = ch.booleans.as_ref().unwrap();
4941        assert_eq!(bools.len(), 5);
4942        assert_eq!(bools.null_count(), 4);
4943        assert!(bools.is_valid(3));
4944
4945        // Text lane validity
4946        let txt = ch.text.as_ref().unwrap();
4947        assert_eq!(txt.len(), 5);
4948        assert_eq!(txt.null_count(), 4);
4949        assert!(txt.is_valid(2)); // ""
4950
4951        // Errors lane
4952        let errs = ch.errors.as_ref().unwrap();
4953        assert_eq!(errs.len(), 5);
4954        assert_eq!(errs.null_count(), 4);
4955        assert!(errs.is_valid(4));
4956    }
4957
4958    #[test]
4959    fn range_view_get_cell_and_padding() {
4960        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
4961        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
4962            .unwrap();
4963        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
4964            .unwrap();
4965        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
4966            .unwrap();
4967        let sheet = b.finish();
4968        let rv = sheet.range_view(0, 0, 2, 1);
4969        assert_eq!(rv.dims(), (3, 2));
4970        // Inside
4971        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
4972        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
4973        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
4974        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
4975        // OOB padding
4976        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
4977        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
4978
4979        // Numbers slices should produce one 2-row and one 1-row segment
4980        let nums: Vec<_> = rv.numbers_slices().map(|r| r.unwrap()).collect();
4981        assert_eq!(nums.len(), 2);
4982        assert_eq!(nums[0].0, 0);
4983        assert_eq!(nums[0].1, 2);
4984        assert_eq!(nums[1].0, 2);
4985        assert_eq!(nums[1].1, 1);
4986    }
4987
4988    #[test]
4989    fn overlay_precedence_user_over_computed() {
4990        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
4991        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
4992        b.append_row(&[LiteralValue::Empty]).unwrap();
4993        b.append_row(&[LiteralValue::Empty]).unwrap();
4994        let mut sheet = b.finish();
4995
4996        let (ch_i, off) = sheet.chunk_of_row(0).unwrap();
4997        sheet.columns[0].chunks[ch_i]
4998            .computed_overlay
4999            .set(off, OverlayValue::Number(2.0));
5000
5001        let rv0 = sheet.range_view(0, 0, 0, 0);
5002        assert_eq!(rv0.get_cell(0, 0), LiteralValue::Number(2.0));
5003        let nums0: Vec<_> = rv0.numbers_slices().map(|r| r.unwrap()).collect();
5004        assert_eq!(nums0.len(), 1);
5005        assert_eq!(nums0[0].2[0].value(0), 2.0);
5006
5007        sheet.columns[0].chunks[ch_i]
5008            .overlay
5009            .set(off, OverlayValue::Number(3.0));
5010
5011        let rv1 = sheet.range_view(0, 0, 0, 0);
5012        assert_eq!(rv1.get_cell(0, 0), LiteralValue::Number(3.0));
5013        let nums1: Vec<_> = rv1.numbers_slices().map(|r| r.unwrap()).collect();
5014        assert_eq!(nums1.len(), 1);
5015        assert_eq!(nums1[0].2[0].value(0), 3.0);
5016    }
5017
5018    #[test]
5019    fn overlay_slice_preserves_explicit_empty_and_offsets() {
5020        let mut overlay = Overlay::new();
5021        overlay.set(2, OverlayValue::Number(2.0));
5022        overlay.set(4, OverlayValue::Empty);
5023        overlay.set(6, OverlayValue::Text(Arc::from("outside")));
5024
5025        let sliced = overlay.slice(1, 4);
5026        assert!(sliced.get_scalar(0).is_none());
5027        assert_eq!(
5028            sliced.get_scalar(1).unwrap().to_literal(),
5029            LiteralValue::Number(2.0)
5030        );
5031        assert_eq!(
5032            sliced.get_scalar(3).unwrap().to_literal(),
5033            LiteralValue::Empty
5034        );
5035        assert!(sliced.get_scalar(5).is_none());
5036    }
5037
5038    #[test]
5039    fn overlay_cascade_user_empty_masks_computed_and_base() {
5040        let mut user = Overlay::new();
5041        let mut computed = Overlay::new();
5042        computed.set(1, OverlayValue::Number(42.0));
5043        user.set(1, OverlayValue::Empty);
5044
5045        let cascade = OverlayCascade::new(&user, &computed);
5046        assert_eq!(
5047            cascade.get_scalar(1).unwrap().to_literal(),
5048            LiteralValue::Empty
5049        );
5050        assert!(cascade.has_any_in_range(1..2));
5051    }
5052
5053    #[test]
5054    fn overlay_storage_pointmap_backward_compat_get_set_remove() {
5055        let mut overlay = Overlay::new();
5056        assert!(overlay.is_empty());
5057
5058        let delta = overlay.set_scalar(1, OverlayValue::Number(10.0));
5059        assert!(delta > 0);
5060        assert_eq!(overlay.len(), 1);
5061        assert_eq!(
5062            overlay.get_scalar(1).unwrap().to_literal(),
5063            LiteralValue::Number(10.0)
5064        );
5065
5066        let replace_delta = overlay.set_scalar(1, OverlayValue::Text(Arc::from("x")));
5067        assert_ne!(replace_delta, 0);
5068        assert_eq!(overlay.len(), 1);
5069        assert_eq!(
5070            overlay.get_scalar(1).unwrap().to_literal(),
5071            LiteralValue::Text("x".into())
5072        );
5073
5074        let remove_delta = overlay.remove_scalar(1);
5075        assert!(remove_delta < 0);
5076        assert!(overlay.is_empty());
5077        assert!(overlay.get_scalar(1).is_none());
5078    }
5079
5080    #[test]
5081    fn overlay_storage_no_fragments_behavior_matches_old_map() {
5082        let mut overlay = Overlay::new();
5083        overlay.set_scalar(0, OverlayValue::Number(1.0));
5084        overlay.set_scalar(3, OverlayValue::Empty);
5085
5086        assert!(overlay.has_any_in_range(0..1));
5087        assert!(!overlay.has_any_in_range(1..3));
5088        assert!(overlay.has_any_in_range(3..4));
5089
5090        let sliced = overlay.slice(2, 3);
5091        assert!(sliced.get_scalar(0).is_none());
5092        assert_eq!(
5093            sliced.get_scalar(1).unwrap().to_literal(),
5094            LiteralValue::Empty
5095        );
5096    }
5097
5098    #[test]
5099    fn overlay_cascade_user_layer_masks_computed_fragment_regardless_of_sequence() {
5100        let mut user = Overlay::new();
5101        let mut computed = Overlay::new();
5102
5103        user.set_scalar(0, OverlayValue::Number(3.0));
5104        computed.apply_fragment(
5105            OverlayFragment::dense_range(0, vec![OverlayValue::Number(2.0)]).unwrap(),
5106        );
5107
5108        let cascade = OverlayCascade::new(&user, &computed);
5109        assert_eq!(
5110            cascade.get_scalar(0).unwrap().to_literal(),
5111            LiteralValue::Number(3.0)
5112        );
5113    }
5114
5115    #[test]
5116    fn overlay_same_layer_later_point_replaces_fragment_cell() {
5117        let mut overlay = Overlay::new();
5118        overlay.apply_fragment(
5119            OverlayFragment::dense_range(
5120                0,
5121                vec![
5122                    OverlayValue::Number(1.0),
5123                    OverlayValue::Number(2.0),
5124                    OverlayValue::Number(3.0),
5125                ],
5126            )
5127            .unwrap(),
5128        );
5129
5130        overlay.set_scalar(1, OverlayValue::Number(99.0));
5131
5132        assert_eq!(
5133            overlay.get_scalar(0).unwrap().to_literal(),
5134            LiteralValue::Number(1.0)
5135        );
5136        assert_eq!(
5137            overlay.get_scalar(1).unwrap().to_literal(),
5138            LiteralValue::Number(99.0)
5139        );
5140        assert_eq!(
5141            overlay.get_scalar(2).unwrap().to_literal(),
5142            LiteralValue::Number(3.0)
5143        );
5144    }
5145
5146    #[test]
5147    fn overlay_same_layer_later_fragment_replaces_point_range() {
5148        let mut overlay = Overlay::new();
5149        overlay.set_scalar(0, OverlayValue::Number(1.0));
5150        overlay.set_scalar(1, OverlayValue::Number(2.0));
5151        overlay.set_scalar(2, OverlayValue::Number(3.0));
5152
5153        overlay.apply_fragment(
5154            OverlayFragment::dense_range(
5155                0,
5156                vec![
5157                    OverlayValue::Number(10.0),
5158                    OverlayValue::Number(20.0),
5159                    OverlayValue::Number(30.0),
5160                ],
5161            )
5162            .unwrap(),
5163        );
5164
5165        let stats = overlay.debug_stats();
5166        assert_eq!(stats.points, 0);
5167        assert_eq!(stats.dense_fragments, 1);
5168        assert!(overlay.debug_is_normalized());
5169        assert_eq!(
5170            overlay.get_scalar(0).unwrap().to_literal(),
5171            LiteralValue::Number(10.0)
5172        );
5173        assert_eq!(
5174            overlay.get_scalar(1).unwrap().to_literal(),
5175            LiteralValue::Number(20.0)
5176        );
5177        assert_eq!(
5178            overlay.get_scalar(2).unwrap().to_literal(),
5179            LiteralValue::Number(30.0)
5180        );
5181    }
5182
5183    #[test]
5184    fn overlay_sparse_far_apart_replacement_does_not_rewrite_unrelated_dense_fragment() {
5185        let mut overlay = Overlay::new();
5186        overlay.apply_fragment(
5187            OverlayFragment::dense_range(100, vec![OverlayValue::Number(1.0); 10]).unwrap(),
5188        );
5189
5190        overlay.apply_fragment(
5191            OverlayFragment::sparse_offsets(vec![
5192                (0, OverlayValue::Empty),
5193                (1000, OverlayValue::Number(1000.0)),
5194            ])
5195            .unwrap(),
5196        );
5197
5198        let stats = overlay.debug_stats();
5199        assert_eq!(stats.dense_fragments, 1);
5200        assert_eq!(stats.sparse_fragments, 1);
5201        assert_eq!(stats.run_fragments, 0);
5202        assert!(overlay.debug_is_normalized());
5203        assert_eq!(
5204            overlay.get_scalar(105).unwrap().to_literal(),
5205            LiteralValue::Number(1.0)
5206        );
5207        assert_eq!(
5208            overlay.get_scalar(0).unwrap().to_literal(),
5209            LiteralValue::Empty
5210        );
5211        assert_eq!(
5212            overlay.get_scalar(1000).unwrap().to_literal(),
5213            LiteralValue::Number(1000.0)
5214        );
5215    }
5216
5217    #[test]
5218    fn overlay_sparse_offsets_are_sorted_unique_last_write_wins() {
5219        let mut overlay = Overlay::new();
5220        overlay.apply_fragment(
5221            OverlayFragment::sparse_offsets(vec![
5222                (3, OverlayValue::Number(3.0)),
5223                (1, OverlayValue::Number(1.0)),
5224                (3, OverlayValue::Number(33.0)),
5225            ])
5226            .unwrap(),
5227        );
5228
5229        let stats = overlay.debug_stats();
5230        assert_eq!(stats.sparse_fragments, 1);
5231        assert_eq!(overlay.len(), 2);
5232        assert_eq!(
5233            overlay.get_scalar(1).unwrap().to_literal(),
5234            LiteralValue::Number(1.0)
5235        );
5236        assert_eq!(
5237            overlay.get_scalar(3).unwrap().to_literal(),
5238            LiteralValue::Number(33.0)
5239        );
5240        assert!(overlay.debug_is_normalized());
5241    }
5242
5243    #[test]
5244    fn overlay_dense_point_replacement_splits_dense_not_sparse() {
5245        let mut overlay = Overlay::new();
5246        overlay.apply_fragment(
5247            OverlayFragment::dense_range(
5248                0,
5249                (0..6)
5250                    .map(|i| OverlayValue::Number(i as f64))
5251                    .collect::<Vec<_>>(),
5252            )
5253            .unwrap(),
5254        );
5255
5256        overlay.set_scalar(3, OverlayValue::Number(99.0));
5257
5258        let stats = overlay.debug_stats();
5259        assert_eq!(stats.points, 1);
5260        assert_eq!(stats.dense_fragments, 2);
5261        assert_eq!(stats.sparse_fragments, 0);
5262        assert!(overlay.debug_is_normalized());
5263        assert_eq!(
5264            overlay.get_scalar(2).unwrap().to_literal(),
5265            LiteralValue::Number(2.0)
5266        );
5267        assert_eq!(
5268            overlay.get_scalar(3).unwrap().to_literal(),
5269            LiteralValue::Number(99.0)
5270        );
5271        assert_eq!(
5272            overlay.get_scalar(4).unwrap().to_literal(),
5273            LiteralValue::Number(4.0)
5274        );
5275    }
5276
5277    #[test]
5278    fn overlay_dense_fragment_replacement_splits_left_and_right_dense() {
5279        let mut overlay = Overlay::new();
5280        overlay.apply_fragment(
5281            OverlayFragment::dense_range(
5282                0,
5283                (0..8)
5284                    .map(|i| OverlayValue::Number(i as f64))
5285                    .collect::<Vec<_>>(),
5286            )
5287            .unwrap(),
5288        );
5289
5290        overlay.apply_fragment(
5291            OverlayFragment::dense_range(
5292                3,
5293                vec![OverlayValue::Number(30.0), OverlayValue::Number(40.0)],
5294            )
5295            .unwrap(),
5296        );
5297
5298        let stats = overlay.debug_stats();
5299        assert_eq!(stats.points, 0);
5300        assert_eq!(stats.dense_fragments, 3);
5301        assert_eq!(stats.sparse_fragments, 0);
5302        assert!(overlay.debug_is_normalized());
5303        assert_eq!(
5304            overlay.get_scalar(2).unwrap().to_literal(),
5305            LiteralValue::Number(2.0)
5306        );
5307        assert_eq!(
5308            overlay.get_scalar(3).unwrap().to_literal(),
5309            LiteralValue::Number(30.0)
5310        );
5311        assert_eq!(
5312            overlay.get_scalar(4).unwrap().to_literal(),
5313            LiteralValue::Number(40.0)
5314        );
5315        assert_eq!(
5316            overlay.get_scalar(5).unwrap().to_literal(),
5317            LiteralValue::Number(5.0)
5318        );
5319    }
5320
5321    #[test]
5322    fn overlay_run_point_replacement_splits_run_not_sparse() {
5323        let mut overlay = Overlay::new();
5324        overlay.apply_fragment(
5325            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 10]).unwrap(),
5326        );
5327
5328        overlay.set_scalar(5, OverlayValue::Number(99.0));
5329
5330        let stats = overlay.debug_stats();
5331        assert_eq!(stats.points, 1);
5332        assert_eq!(stats.run_fragments, 2);
5333        assert_eq!(stats.sparse_fragments, 0);
5334        assert!(overlay.debug_is_normalized());
5335        assert_eq!(
5336            overlay.get_scalar(4).unwrap().to_literal(),
5337            LiteralValue::Number(1.0)
5338        );
5339        assert_eq!(
5340            overlay.get_scalar(5).unwrap().to_literal(),
5341            LiteralValue::Number(99.0)
5342        );
5343        assert_eq!(
5344            overlay.get_scalar(6).unwrap().to_literal(),
5345            LiteralValue::Number(1.0)
5346        );
5347    }
5348
5349    #[test]
5350    fn overlay_run_fragment_replacement_splits_left_and_right_run() {
5351        let mut overlay = Overlay::new();
5352        let values = [
5353            vec![OverlayValue::Number(1.0); 4],
5354            vec![OverlayValue::Number(2.0); 4],
5355            vec![OverlayValue::Number(3.0); 4],
5356        ]
5357        .concat();
5358        overlay.apply_fragment(OverlayFragment::run_range(0, values).unwrap());
5359
5360        overlay.apply_fragment(
5361            OverlayFragment::dense_range(
5362                5,
5363                vec![OverlayValue::Number(50.0), OverlayValue::Number(60.0)],
5364            )
5365            .unwrap(),
5366        );
5367
5368        let stats = overlay.debug_stats();
5369        assert_eq!(stats.run_fragments, 2);
5370        assert_eq!(stats.dense_fragments, 1);
5371        assert_eq!(stats.sparse_fragments, 0);
5372        assert!(overlay.debug_is_normalized());
5373        assert_eq!(
5374            overlay.get_scalar(4).unwrap().to_literal(),
5375            LiteralValue::Number(2.0)
5376        );
5377        assert_eq!(
5378            overlay.get_scalar(5).unwrap().to_literal(),
5379            LiteralValue::Number(50.0)
5380        );
5381        assert_eq!(
5382            overlay.get_scalar(6).unwrap().to_literal(),
5383            LiteralValue::Number(60.0)
5384        );
5385        assert_eq!(
5386            overlay.get_scalar(7).unwrap().to_literal(),
5387            LiteralValue::Number(2.0)
5388        );
5389    }
5390
5391    #[test]
5392    fn overlay_slice_preserves_dense_and_run_encodings() {
5393        let mut overlay = Overlay::new();
5394        overlay.apply_fragment(
5395            OverlayFragment::dense_range(
5396                10,
5397                (0..5)
5398                    .map(|i| OverlayValue::Number(i as f64))
5399                    .collect::<Vec<_>>(),
5400            )
5401            .unwrap(),
5402        );
5403        overlay.apply_fragment(
5404            OverlayFragment::run_range(
5405                20,
5406                [
5407                    vec![OverlayValue::Number(1.0); 3],
5408                    vec![OverlayValue::Number(2.0); 3],
5409                ]
5410                .concat(),
5411            )
5412            .unwrap(),
5413        );
5414
5415        let dense_slice = overlay.slice(12, 2);
5416        let dense_stats = dense_slice.debug_stats();
5417        assert_eq!(dense_stats.dense_fragments, 1);
5418        assert_eq!(dense_stats.sparse_fragments, 0);
5419        assert_eq!(
5420            dense_slice.get_scalar(0).unwrap().to_literal(),
5421            LiteralValue::Number(2.0)
5422        );
5423        assert_eq!(
5424            dense_slice.get_scalar(1).unwrap().to_literal(),
5425            LiteralValue::Number(3.0)
5426        );
5427        assert!(dense_slice.debug_is_normalized());
5428
5429        let run_slice = overlay.slice(22, 3);
5430        let run_stats = run_slice.debug_stats();
5431        assert_eq!(run_stats.run_fragments, 1);
5432        assert_eq!(run_stats.sparse_fragments, 0);
5433        assert_eq!(
5434            run_slice.get_scalar(0).unwrap().to_literal(),
5435            LiteralValue::Number(1.0)
5436        );
5437        assert_eq!(
5438            run_slice.get_scalar(1).unwrap().to_literal(),
5439            LiteralValue::Number(2.0)
5440        );
5441        assert_eq!(
5442            run_slice.get_scalar(2).unwrap().to_literal(),
5443            LiteralValue::Number(2.0)
5444        );
5445        assert!(run_slice.debug_is_normalized());
5446    }
5447
5448    #[test]
5449    fn overlay_computed_empty_run_masks_non_empty_base() {
5450        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
5451        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
5452        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
5453        b.append_row(&[LiteralValue::Number(3.0)]).unwrap();
5454        let mut sheet = b.finish();
5455
5456        let (ch_i, _) = sheet.chunk_of_row(0).unwrap();
5457        sheet.columns[0].chunks[ch_i]
5458            .computed_overlay
5459            .apply_fragment(
5460                OverlayFragment::run_range(
5461                    0,
5462                    vec![
5463                        OverlayValue::Empty,
5464                        OverlayValue::Empty,
5465                        OverlayValue::Empty,
5466                    ],
5467                )
5468                .unwrap(),
5469            );
5470
5471        assert_eq!(sheet.get_cell_value(0, 0), LiteralValue::Empty);
5472        assert_eq!(sheet.get_cell_value(1, 0), LiteralValue::Empty);
5473        assert_eq!(sheet.get_cell_value(2, 0), LiteralValue::Empty);
5474    }
5475
5476    #[test]
5477    fn overlay_fragments_reconstruct_scalars_from_typed_lanes() {
5478        let values = vec![
5479            OverlayValue::Empty,
5480            OverlayValue::Number(1.5),
5481            OverlayValue::DateTime(45000.25),
5482            OverlayValue::Duration(0.5),
5483            OverlayValue::Boolean(true),
5484            OverlayValue::Text(Arc::from("Hello")),
5485            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
5486            OverlayValue::Pending,
5487        ];
5488
5489        let mut dense = Overlay::new();
5490        dense.apply_fragment(OverlayFragment::dense_range(0, values.clone()).unwrap());
5491        for (idx, expected) in values.iter().enumerate() {
5492            assert_eq!(
5493                dense.get_scalar(idx).unwrap().to_overlay_value(),
5494                expected.clone()
5495            );
5496        }
5497
5498        let mut sparse = Overlay::new();
5499        sparse.apply_fragment(
5500            OverlayFragment::sparse_offsets(
5501                values
5502                    .iter()
5503                    .cloned()
5504                    .enumerate()
5505                    .map(|(idx, value)| (idx * 2, value))
5506                    .collect(),
5507            )
5508            .unwrap(),
5509        );
5510        for (idx, expected) in values.iter().enumerate() {
5511            assert_eq!(
5512                sparse.get_scalar(idx * 2).unwrap().to_overlay_value(),
5513                expected.clone()
5514            );
5515        }
5516
5517        let mut run = Overlay::new();
5518        run.apply_fragment(
5519            OverlayFragment::run_range(
5520                0,
5521                vec![
5522                    OverlayValue::Number(7.0),
5523                    OverlayValue::Number(7.0),
5524                    OverlayValue::Text(Arc::from("run")),
5525                    OverlayValue::Text(Arc::from("run")),
5526                ],
5527            )
5528            .unwrap(),
5529        );
5530        assert_eq!(
5531            run.get_scalar(0).unwrap().to_overlay_value(),
5532            OverlayValue::Number(7.0)
5533        );
5534        assert_eq!(
5535            run.get_scalar(2).unwrap().to_overlay_value(),
5536            OverlayValue::Text(Arc::from("run"))
5537        );
5538    }
5539
5540    #[test]
5541    fn overlay_iter_returns_complete_logical_entries() {
5542        let mut overlay = Overlay::new();
5543        overlay.apply_fragment(
5544            OverlayFragment::dense_range(
5545                2,
5546                vec![OverlayValue::Number(2.0), OverlayValue::Number(3.0)],
5547            )
5548            .unwrap(),
5549        );
5550        overlay.set_scalar(5, OverlayValue::Text(Arc::from("point")));
5551
5552        let entries: Vec<_> = overlay.iter().collect();
5553        assert_eq!(
5554            entries,
5555            vec![
5556                (2, OverlayValue::Number(2.0)),
5557                (3, OverlayValue::Number(3.0)),
5558                (5, OverlayValue::Text(Arc::from("point"))),
5559            ]
5560        );
5561        assert_eq!(overlay.iter_points().count(), 1);
5562    }
5563
5564    #[test]
5565    fn overlay_fragment_estimates_follow_encoded_shapes() {
5566        let mut points = Overlay::new();
5567        for idx in 0..512 {
5568            points.set_scalar(idx, OverlayValue::Number(idx as f64));
5569        }
5570
5571        let mut dense = Overlay::new();
5572        dense.apply_fragment(
5573            OverlayFragment::dense_range(
5574                0,
5575                (0..512)
5576                    .map(|idx| OverlayValue::Number(idx as f64))
5577                    .collect::<Vec<_>>(),
5578            )
5579            .unwrap(),
5580        );
5581        assert_eq!(
5582            dense.estimated_bytes(),
5583            dense.debug_recomputed_estimated_bytes()
5584        );
5585        assert!(
5586            dense.estimated_bytes() < points.estimated_bytes(),
5587            "dense fragment should account like encoded lanes, not point-map entries"
5588        );
5589
5590        let mut short_run = Overlay::new();
5591        short_run.apply_fragment(
5592            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 8]).unwrap(),
5593        );
5594        let mut long_run = Overlay::new();
5595        long_run.apply_fragment(
5596            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 4096]).unwrap(),
5597        );
5598        assert_eq!(
5599            short_run.estimated_bytes(),
5600            short_run.debug_recomputed_estimated_bytes()
5601        );
5602        assert_eq!(
5603            long_run.estimated_bytes(),
5604            long_run.debug_recomputed_estimated_bytes()
5605        );
5606        assert_eq!(
5607            short_run.estimated_bytes(),
5608            long_run.estimated_bytes(),
5609            "single-run estimate should scale with run count, not covered rows"
5610        );
5611
5612        let sparse10 = OverlayFragment::sparse_offsets(
5613            (0..10)
5614                .map(|idx| (idx * 3, OverlayValue::Number(idx as f64)))
5615                .collect(),
5616        )
5617        .unwrap();
5618        let sparse20 = OverlayFragment::sparse_offsets(
5619            (0..20)
5620                .map(|idx| (idx * 3, OverlayValue::Number(idx as f64)))
5621                .collect(),
5622        )
5623        .unwrap();
5624        assert!(sparse20.estimated_bytes() > sparse10.estimated_bytes());
5625    }
5626
5627    #[test]
5628    fn overlay_estimated_bytes_stay_consistent_after_split_and_clear() {
5629        let mut overlay = Overlay::new();
5630        overlay.apply_fragment(
5631            OverlayFragment::dense_range(
5632                0,
5633                (0..16)
5634                    .map(|idx| OverlayValue::Number(idx as f64))
5635                    .collect::<Vec<_>>(),
5636            )
5637            .unwrap(),
5638        );
5639        assert_eq!(
5640            overlay.estimated_bytes(),
5641            overlay.debug_recomputed_estimated_bytes()
5642        );
5643
5644        overlay.set_scalar(8, OverlayValue::Text(Arc::from("split")));
5645        assert!(overlay.debug_is_normalized());
5646        assert_eq!(
5647            overlay.estimated_bytes(),
5648            overlay.debug_recomputed_estimated_bytes()
5649        );
5650
5651        overlay.apply_fragment(
5652            OverlayFragment::sparse_offsets(vec![
5653                (0, OverlayValue::Empty),
5654                (15, OverlayValue::Boolean(true)),
5655            ])
5656            .unwrap(),
5657        );
5658        assert!(overlay.debug_is_normalized());
5659        assert_eq!(
5660            overlay.estimated_bytes(),
5661            overlay.debug_recomputed_estimated_bytes()
5662        );
5663
5664        let freed = overlay.clear_all();
5665        assert!(freed > 0);
5666        assert_eq!(overlay.estimated_bytes(), 0);
5667        assert_eq!(overlay.debug_recomputed_estimated_bytes(), 0);
5668        assert!(overlay.is_empty());
5669    }
5670
5671    #[test]
5672    fn overlay_segment_numbers_masks_base_for_non_numeric_overlays() {
5673        let mut user = Overlay::new();
5674        user.set(1, OverlayValue::Text(Arc::from("x")));
5675        user.set(2, OverlayValue::Empty);
5676        user.set(3, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
5677        user.set(4, OverlayValue::Pending);
5678        let computed = Overlay::new();
5679        let cascade = OverlayCascade::new(&user, &computed);
5680
5681        let base = Float64Array::from(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
5682        let selected = cascade.select_numbers(0..5, &base);
5683        assert_eq!(selected.value(0), 10.0);
5684        assert!(selected.is_null(1));
5685        assert!(selected.is_null(2));
5686        assert!(selected.is_null(3));
5687        assert!(selected.is_null(4));
5688    }
5689
5690    #[test]
5691    fn overlay_segment_type_tags_preserve_temporal_tags() {
5692        let mut computed = Overlay::new();
5693        computed.set(0, OverlayValue::DateTime(45000.5));
5694        computed.set(1, OverlayValue::Duration(0.25));
5695        let user = Overlay::new();
5696        let cascade = OverlayCascade::new(&user, &computed);
5697
5698        let base = UInt8Array::from(vec![TypeTag::Empty as u8; 2]);
5699        let selected = cascade.select_type_tags(0..2, &base);
5700        assert_eq!(selected.value(0), TypeTag::DateTime as u8);
5701        assert_eq!(selected.value(1), TypeTag::Duration as u8);
5702    }
5703
5704    #[test]
5705    fn overlay_lowered_text_matches_existing_overlay_semantics() {
5706        let mut user = Overlay::new();
5707        user.set(0, OverlayValue::Text(Arc::from("HeLLo")));
5708        user.set(1, OverlayValue::Number(1.5));
5709        user.set(2, OverlayValue::Boolean(true));
5710        user.set(3, OverlayValue::Empty);
5711        let computed = Overlay::new();
5712        let cascade = OverlayCascade::new(&user, &computed);
5713
5714        let base = StringArray::from(vec![Some("A"), Some("B"), Some("C"), Some("D")]);
5715        let selected = cascade.select_lowered_text(0..4, &base);
5716        assert_eq!(selected.value(0), "hello");
5717        assert_eq!(selected.value(1), "1.5");
5718        assert_eq!(selected.value(2), "true");
5719        assert!(selected.is_null(3));
5720    }
5721
5722    fn numeric_sheet(rows: usize) -> ArrowSheet {
5723        let mut b = IngestBuilder::new("S", 1, rows.max(1), crate::engine::DateSystem::Excel1900);
5724        for row in 0..rows {
5725            b.append_row(&[LiteralValue::Number((row + 1) as f64)])
5726                .unwrap();
5727        }
5728        b.finish()
5729    }
5730
5731    fn numbers_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<Float64Array> {
5732        let view = sheet.range_view(sr, 0, er, 0);
5733        let segments: Vec<_> = view.numbers_slices().map(|res| res.unwrap()).collect();
5734        assert_eq!(segments.len(), 1);
5735        assert_eq!(segments[0].2.len(), 1);
5736        segments[0].2[0].clone()
5737    }
5738
5739    fn type_tags_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<UInt8Array> {
5740        let view = sheet.range_view(sr, 0, er, 0);
5741        let segments: Vec<_> = view.type_tags_slices().map(|res| res.unwrap()).collect();
5742        assert_eq!(segments.len(), 1);
5743        assert_eq!(segments[0].2.len(), 1);
5744        segments[0].2[0].clone()
5745    }
5746
5747    fn lowered_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<StringArray> {
5748        let view = sheet.range_view(sr, 0, er, 0);
5749        let segments: Vec<_> = view.lowered_text_slices().map(|res| res.unwrap()).collect();
5750        assert_eq!(segments.len(), 1);
5751        assert_eq!(segments[0].2.len(), 1);
5752        segments[0].2[0].clone()
5753    }
5754
5755    #[test]
5756    fn rangeview_dense_text_masks_base_numbers() {
5757        let mut sheet = numeric_sheet(4);
5758        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5759            OverlayFragment::dense_range(
5760                0,
5761                vec![
5762                    OverlayValue::Text(Arc::from("x")),
5763                    OverlayValue::Text(Arc::from("y")),
5764                    OverlayValue::Text(Arc::from("z")),
5765                    OverlayValue::Text(Arc::from("w")),
5766                ],
5767            )
5768            .unwrap(),
5769        );
5770
5771        reset_overlay_select_stats();
5772        let numbers = numbers_for_range(&sheet, 0, 3);
5773        assert_eq!(numbers.null_count(), 4);
5774        let stats = snapshot_overlay_select_stats();
5775        assert_eq!(stats.direct_dense_slices, 1);
5776        assert_eq!(stats.zip_select_calls, 0);
5777    }
5778
5779    #[test]
5780    fn rangeview_empty_dense_masks_base_all_selectors() {
5781        let mut sheet = numeric_sheet(3);
5782        sheet.columns[0].chunks[0]
5783            .computed_overlay
5784            .apply_fragment(OverlayFragment::dense_range(0, vec![OverlayValue::Empty; 3]).unwrap());
5785
5786        reset_overlay_select_stats();
5787        let numbers = numbers_for_range(&sheet, 0, 2);
5788        let type_tags = type_tags_for_range(&sheet, 0, 2);
5789        let lowered = lowered_for_range(&sheet, 0, 2);
5790        assert_eq!(numbers.null_count(), 3);
5791        assert_eq!(lowered.null_count(), 3);
5792        assert_eq!(type_tags.values(), &[TypeTag::Empty as u8; 3]);
5793        let stats = snapshot_overlay_select_stats();
5794        assert_eq!(stats.direct_dense_slices, 3);
5795        assert_eq!(stats.zip_select_calls, 0);
5796    }
5797
5798    #[test]
5799    fn rangeview_pending_masks_base_type_tag_present_lanes_null() {
5800        let mut sheet = numeric_sheet(2);
5801        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5802            OverlayFragment::dense_range(0, vec![OverlayValue::Pending; 2]).unwrap(),
5803        );
5804
5805        reset_overlay_select_stats();
5806        let numbers = numbers_for_range(&sheet, 0, 1);
5807        let type_tags = type_tags_for_range(&sheet, 0, 1);
5808        let lowered = lowered_for_range(&sheet, 0, 1);
5809        assert_eq!(numbers.null_count(), 2);
5810        assert_eq!(lowered.null_count(), 2);
5811        assert_eq!(type_tags.values(), &[TypeTag::Pending as u8; 2]);
5812        let stats = snapshot_overlay_select_stats();
5813        assert_eq!(stats.direct_dense_slices, 3);
5814        assert_eq!(stats.zip_select_calls, 0);
5815    }
5816
5817    #[test]
5818    fn rangeview_subrange_inside_dense_fragment_uses_direct_path() {
5819        let mut sheet = numeric_sheet(10);
5820        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5821            OverlayFragment::dense_range(
5822                0,
5823                (0..10)
5824                    .map(|row| OverlayValue::Number((row + 10) as f64))
5825                    .collect(),
5826            )
5827            .unwrap(),
5828        );
5829
5830        reset_overlay_select_stats();
5831        let numbers = numbers_for_range(&sheet, 2, 6);
5832        assert_eq!(numbers.len(), 5);
5833        assert_eq!(numbers.value(0), 12.0);
5834        assert_eq!(numbers.value(4), 16.0);
5835        let stats = snapshot_overlay_select_stats();
5836        assert_eq!(stats.direct_dense_slices, 1);
5837        assert_eq!(stats.zip_select_calls, 0);
5838    }
5839
5840    #[test]
5841    fn rangeview_subrange_inside_run_fragment_uses_direct_path() {
5842        let mut sheet = numeric_sheet(10);
5843        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5844            OverlayFragment::run_range(0, vec![OverlayValue::Number(7.0); 10]).unwrap(),
5845        );
5846
5847        reset_overlay_select_stats();
5848        let numbers = numbers_for_range(&sheet, 2, 6);
5849        assert_eq!(numbers.len(), 5);
5850        for idx in 0..numbers.len() {
5851            assert_eq!(numbers.value(idx), 7.0);
5852        }
5853        let stats = snapshot_overlay_select_stats();
5854        assert_eq!(stats.direct_run_materializations, 1);
5855        assert_eq!(stats.zip_select_calls, 0);
5856    }
5857
5858    #[test]
5859    fn rangeview_user_partial_wrong_type_masks_computed_numeric() {
5860        let mut sheet = numeric_sheet(5);
5861        let chunk = &mut sheet.columns[0].chunks[0];
5862        chunk.computed_overlay.apply_fragment(
5863            OverlayFragment::dense_range(
5864                0,
5865                (0..5)
5866                    .map(|row| OverlayValue::Number((row + 10) as f64))
5867                    .collect(),
5868            )
5869            .unwrap(),
5870        );
5871        chunk.overlay.apply_fragment(
5872            OverlayFragment::dense_range(2, vec![OverlayValue::Text(Arc::from("mask"))]).unwrap(),
5873        );
5874
5875        reset_overlay_select_stats();
5876        let numbers = numbers_for_range(&sheet, 0, 4);
5877        assert_eq!(numbers.value(0), 10.0);
5878        assert_eq!(numbers.value(1), 11.0);
5879        assert!(numbers.is_null(2));
5880        assert_eq!(numbers.value(3), 13.0);
5881        assert_eq!(numbers.value(4), 14.0);
5882        let stats = snapshot_overlay_select_stats();
5883        assert_eq!(stats.direct_dense_slices, 0);
5884        assert_eq!(stats.zip_select_calls, 1);
5885        assert_eq!(stats.partial_dense_intersections, 2);
5886    }
5887
5888    #[test]
5889    fn rangeview_computed_full_cover_user_no_overlap_uses_computed_direct() {
5890        let mut sheet = numeric_sheet(5);
5891        let chunk = &mut sheet.columns[0].chunks[0];
5892        chunk.computed_overlay.apply_fragment(
5893            OverlayFragment::dense_range(0, vec![OverlayValue::Number(3.0); 5]).unwrap(),
5894        );
5895        chunk
5896            .overlay
5897            .set_scalar(10, OverlayValue::Text(Arc::from("outside")));
5898
5899        reset_overlay_select_stats();
5900        let numbers = numbers_for_range(&sheet, 0, 4);
5901        assert_eq!(numbers.value(0), 3.0);
5902        assert_eq!(numbers.value(4), 3.0);
5903        let stats = snapshot_overlay_select_stats();
5904        assert_eq!(stats.direct_dense_slices, 1);
5905        assert_eq!(stats.zip_select_calls, 0);
5906    }
5907
5908    #[test]
5909    fn rangeview_user_full_cover_ignores_computed() {
5910        let mut sheet = numeric_sheet(4);
5911        let chunk = &mut sheet.columns[0].chunks[0];
5912        chunk.computed_overlay.apply_fragment(
5913            OverlayFragment::dense_range(0, vec![OverlayValue::Number(99.0); 4]).unwrap(),
5914        );
5915        chunk.overlay.apply_fragment(
5916            OverlayFragment::dense_range(0, vec![OverlayValue::Text(Arc::from("user")); 4])
5917                .unwrap(),
5918        );
5919
5920        reset_overlay_select_stats();
5921        let numbers = numbers_for_range(&sheet, 0, 3);
5922        assert_eq!(numbers.null_count(), 4);
5923        let stats = snapshot_overlay_select_stats();
5924        assert_eq!(stats.direct_dense_slices, 1);
5925        assert_eq!(stats.zip_select_calls, 0);
5926    }
5927
5928    #[test]
5929    fn rangeview_point_overlay_still_matches_legacy_scalar_path() {
5930        let mut sheet = numeric_sheet(3);
5931        sheet.columns[0].chunks[0]
5932            .computed_overlay
5933            .set_scalar(1, OverlayValue::Text(Arc::from("point")));
5934
5935        reset_overlay_select_stats();
5936        let numbers = numbers_for_range(&sheet, 0, 2);
5937        assert_eq!(numbers.value(0), 1.0);
5938        assert!(numbers.is_null(1));
5939        assert_eq!(numbers.value(2), 3.0);
5940        let stats = snapshot_overlay_select_stats();
5941        assert_eq!(stats.zip_select_calls, 1);
5942        assert_eq!(stats.point_entries_applied, 1);
5943        assert_eq!(stats.row_scalar_fallbacks, 0);
5944    }
5945
5946    #[test]
5947    fn rangeview_multi_fragment_full_union_does_not_use_direct_path() {
5948        let mut sheet = numeric_sheet(4);
5949        let chunk = &mut sheet.columns[0].chunks[0];
5950        chunk.computed_overlay.apply_fragment(
5951            OverlayFragment::dense_range(0, vec![OverlayValue::Number(10.0); 2]).unwrap(),
5952        );
5953        chunk.computed_overlay.apply_fragment(
5954            OverlayFragment::dense_range(2, vec![OverlayValue::Number(20.0); 2]).unwrap(),
5955        );
5956
5957        reset_overlay_select_stats();
5958        let numbers = numbers_for_range(&sheet, 0, 3);
5959        assert_eq!(numbers.value(0), 10.0);
5960        assert_eq!(numbers.value(1), 10.0);
5961        assert_eq!(numbers.value(2), 20.0);
5962        assert_eq!(numbers.value(3), 20.0);
5963        let stats = snapshot_overlay_select_stats();
5964        assert_eq!(stats.direct_dense_slices, 0);
5965        assert_eq!(stats.zip_select_calls, 1);
5966        assert_eq!(stats.partial_dense_intersections, 2);
5967    }
5968
5969    #[test]
5970    fn rangeview_lowered_text_fragment_semantics_match_scalar_semantics() {
5971        let mut sheet = numeric_sheet(8);
5972        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5973            OverlayFragment::dense_range(
5974                0,
5975                vec![
5976                    OverlayValue::Text(Arc::from("HeLLo")),
5977                    OverlayValue::Number(1.5),
5978                    OverlayValue::DateTime(45000.25),
5979                    OverlayValue::Duration(0.5),
5980                    OverlayValue::Boolean(true),
5981                    OverlayValue::Empty,
5982                    OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
5983                    OverlayValue::Pending,
5984                ],
5985            )
5986            .unwrap(),
5987        );
5988
5989        reset_overlay_select_stats();
5990        let lowered = lowered_for_range(&sheet, 0, 7);
5991        assert_eq!(lowered.value(0), "hello");
5992        assert_eq!(lowered.value(1), "1.5");
5993        assert_eq!(lowered.value(2), "45000.25");
5994        assert_eq!(lowered.value(3), "0.5");
5995        assert_eq!(lowered.value(4), "true");
5996        assert!(lowered.is_null(5));
5997        assert!(lowered.is_null(6));
5998        assert!(lowered.is_null(7));
5999        let stats = snapshot_overlay_select_stats();
6000        assert_eq!(stats.direct_dense_slices, 1);
6001        assert_eq!(stats.zip_select_calls, 0);
6002    }
6003
6004    #[test]
6005    fn row_chunk_slices_shape() {
6006        // chunk_rows=2 leads to two slices for 3 rows
6007        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6008        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
6009            .unwrap();
6010        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
6011            .unwrap();
6012        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
6013            .unwrap();
6014        let sheet = b.finish();
6015        let rv = sheet.range_view(0, 0, 2, 1);
6016        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
6017        assert_eq!(slices.len(), 2);
6018        assert_eq!(slices[0].row_start, 0);
6019        assert_eq!(slices[0].row_len, 2);
6020        assert_eq!(slices[0].cols.len(), 2);
6021        assert_eq!(slices[1].row_start, 2);
6022        assert_eq!(slices[1].row_len, 1);
6023        assert_eq!(slices[1].cols.len(), 2);
6024    }
6025
6026    #[test]
6027    fn oob_columns_are_padded() {
6028        // Build with 2 columns; request 3 columns (ec beyond last col)
6029        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6030        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
6031            .unwrap();
6032        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
6033            .unwrap();
6034        let sheet = b.finish();
6035        // Request cols [0..=2] → 3 columns with padding
6036        let rv = sheet.range_view(0, 0, 1, 2);
6037        assert_eq!(rv.dims(), (2, 3));
6038        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
6039        assert!(!slices.is_empty());
6040        for cs in &slices {
6041            assert_eq!(cs.cols.len(), 3);
6042        }
6043        // Also validate typed slices return 3 entries per segment
6044        for res in rv.numbers_slices() {
6045            let (_rs, _rl, cols) = res.unwrap();
6046            assert_eq!(cols.len(), 3);
6047        }
6048        for res in rv.booleans_slices() {
6049            let (_rs, _rl, cols) = res.unwrap();
6050            assert_eq!(cols.len(), 3);
6051        }
6052        for res in rv.text_slices() {
6053            let (_rs, _rl, cols) = res.unwrap();
6054            assert_eq!(cols.len(), 3);
6055        }
6056        for res in rv.errors_slices() {
6057            let (_rs, _rl, cols) = res.unwrap();
6058            assert_eq!(cols.len(), 3);
6059        }
6060        for res in rv.lowered_text_slices() {
6061            let (_rs, _rl, cols) = res.unwrap();
6062            assert_eq!(cols.len(), 3);
6063        }
6064    }
6065
6066    #[test]
6067    fn reversed_range_is_empty() {
6068        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6069        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
6070        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
6071        let sheet = b.finish();
6072        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
6073        assert_eq!(rv.dims(), (0, 0));
6074        assert!(rv.iter_row_chunks().next().is_none());
6075        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
6076    }
6077
6078    #[test]
6079    fn chunk_alignment_invariant() {
6080        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
6081        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
6082        for r in 0..5 {
6083            b.append_row(&[
6084                LiteralValue::Number(r as f64),
6085                LiteralValue::Text(format!("{r}")),
6086                if r % 2 == 0 {
6087                    LiteralValue::Empty
6088                } else {
6089                    LiteralValue::Boolean(true)
6090                },
6091            ])
6092            .unwrap();
6093        }
6094        let sheet = b.finish();
6095        // chunk_starts should be [0,2,4]
6096        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
6097        // All columns must share per-chunk lengths equal to [2,2,1]
6098        let lens0: Vec<usize> = sheet.columns[0]
6099            .chunks
6100            .iter()
6101            .map(|ch| ch.type_tag.len())
6102            .collect();
6103        for col in &sheet.columns[1..] {
6104            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6105            assert_eq!(lens, lens0);
6106        }
6107    }
6108
6109    #[test]
6110    fn chunking_splits_rows() {
6111        // Two columns, chunk size 2 → expect two chunks
6112        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6113        let rows = vec![
6114            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
6115            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
6116            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
6117        ];
6118        for r in rows {
6119            b.append_row(&r).unwrap();
6120        }
6121        let sheet = b.finish();
6122        assert_eq!(sheet.columns[0].chunks.len(), 2);
6123        assert_eq!(sheet.columns[1].chunks.len(), 2);
6124        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
6125        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
6126    }
6127
6128    #[test]
6129    fn pending_is_not_error() {
6130        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
6131        b.append_row(&[LiteralValue::Pending]).unwrap();
6132        let sheet = b.finish();
6133        let ch = &sheet.columns[0].chunks[0];
6134        // tag is Pending
6135        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
6136        // errors lane is effectively null
6137        let errs = ch.errors_or_null();
6138        assert_eq!(errs.null_count(), 1);
6139    }
6140
6141    #[test]
6142    fn all_null_numeric_lane_uses_null_array() {
6143        // Only text values in first column → numbers lane should be all null with correct dtype
6144        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
6145        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
6146        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
6147        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
6148        let sheet = b.finish();
6149        let ch = &sheet.columns[0].chunks[0];
6150        let nums = ch.numbers_or_null();
6151        assert_eq!(nums.len(), 3);
6152        assert_eq!(nums.null_count(), 3);
6153        assert_eq!(nums.data_type(), &DataType::Float64);
6154    }
6155
6156    #[test]
6157    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
6158        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
6159        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6160        for _ in 0..10 {
6161            b.append_row(&[LiteralValue::Empty]).unwrap();
6162        }
6163        let mut sheet = b.finish();
6164        // Add overlays at row 3 and row 4
6165        {
6166            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
6167            sheet.columns[0].chunks[c0]
6168                .overlay
6169                .set(o0, OverlayValue::Number(30.0));
6170            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
6171            sheet.columns[0].chunks[c1]
6172                .overlay
6173                .set(o1, OverlayValue::Number(40.0));
6174        }
6175        // Insert 2 rows before row 4 (at chunk boundary)
6176        sheet.insert_rows(4, 2);
6177        assert_eq!(sheet.nrows, 12);
6178        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
6179        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6180        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
6181        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
6182        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
6183
6184        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
6185        sheet.delete_rows(3, 3);
6186        assert_eq!(sheet.nrows, 9);
6187        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6188        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
6189        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
6190        let lens0: Vec<usize> = sheet.columns[0]
6191            .chunks
6192            .iter()
6193            .map(|ch| ch.type_tag.len())
6194            .collect();
6195        for col in &sheet.columns {
6196            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6197            assert_eq!(lens, lens0);
6198        }
6199        // chunk_starts should be monotonic and final chunk end == nrows
6200        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
6201        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
6202        let last_len = sheet.columns[0]
6203            .chunks
6204            .last()
6205            .map(|c| c.type_tag.len())
6206            .unwrap_or(0);
6207        assert_eq!(last_start + last_len, sheet.nrows as usize);
6208    }
6209
6210    #[test]
6211    fn row_insert_delete_preserves_user_dense_fragments() {
6212        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6213        for _ in 0..10 {
6214            b.append_row(&[LiteralValue::Empty]).unwrap();
6215        }
6216        let mut sheet = b.finish();
6217
6218        let (ch_idx, off) = sheet.chunk_of_row(1).unwrap();
6219        sheet.columns[0]
6220            .chunk_mut(ch_idx)
6221            .unwrap()
6222            .overlay
6223            .apply_fragment(
6224                OverlayFragment::dense_range(
6225                    off,
6226                    vec![
6227                        OverlayValue::Number(10.0),
6228                        OverlayValue::Number(20.0),
6229                        OverlayValue::Number(30.0),
6230                    ],
6231                )
6232                .unwrap(),
6233            );
6234
6235        let before = column_overlay_stats(&sheet, 0, false);
6236        assert_eq!(before.dense_fragments, 1);
6237        assert_eq!(before.sparse_fragments, 0);
6238        assert_column_overlays_normalized(&sheet, 0);
6239
6240        sheet.insert_rows(2, 2);
6241        assert_eq!(sheet.nrows, 12);
6242        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6243        assert_eq!(av.get_cell(1, 0), LiteralValue::Number(10.0));
6244        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6245        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6246        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(20.0));
6247        assert_eq!(av.get_cell(5, 0), LiteralValue::Number(30.0));
6248        let after_insert = column_overlay_stats(&sheet, 0, false);
6249        assert_eq!(after_insert.sparse_fragments, 0);
6250        assert!(after_insert.dense_fragments >= 2);
6251        assert_column_overlays_normalized(&sheet, 0);
6252
6253        sheet.delete_rows(2, 2);
6254        assert_eq!(sheet.nrows, 10);
6255        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6256        assert_eq!(av.get_cell(1, 0), LiteralValue::Number(10.0));
6257        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(20.0));
6258        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
6259        let after_delete = column_overlay_stats(&sheet, 0, false);
6260        assert_eq!(after_delete.sparse_fragments, 0);
6261        assert!(after_delete.dense_fragments >= 1);
6262        assert_column_overlays_normalized(&sheet, 0);
6263    }
6264
6265    #[test]
6266    fn row_insert_delete_preserves_computed_empty_run_fragments() {
6267        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6268        for row in 0..8 {
6269            b.append_row(&[LiteralValue::Number((row + 1) as f64)])
6270                .unwrap();
6271        }
6272        let mut sheet = b.finish();
6273
6274        let (ch_idx, off) = sheet.chunk_of_row(1).unwrap();
6275        sheet.columns[0]
6276            .chunk_mut(ch_idx)
6277            .unwrap()
6278            .computed_overlay
6279            .apply_fragment(
6280                OverlayFragment::run_range(
6281                    off,
6282                    vec![
6283                        OverlayValue::Empty,
6284                        OverlayValue::Empty,
6285                        OverlayValue::Empty,
6286                    ],
6287                )
6288                .unwrap(),
6289            );
6290
6291        let before = column_overlay_stats(&sheet, 0, true);
6292        assert_eq!(before.run_fragments, 1);
6293        assert_eq!(before.sparse_fragments, 0);
6294        assert_column_overlays_normalized(&sheet, 0);
6295
6296        sheet.insert_rows(2, 1);
6297        assert_eq!(sheet.nrows, 9);
6298        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6299        assert_eq!(av.get_cell(1, 0), LiteralValue::Empty);
6300        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6301        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6302        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
6303        assert_eq!(av.get_cell(5, 0), LiteralValue::Number(5.0));
6304        let after_insert = column_overlay_stats(&sheet, 0, true);
6305        assert_eq!(after_insert.sparse_fragments, 0);
6306        assert!(after_insert.run_fragments >= 2);
6307        assert_column_overlays_normalized(&sheet, 0);
6308
6309        sheet.delete_rows(2, 1);
6310        assert_eq!(sheet.nrows, 8);
6311        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6312        assert_eq!(av.get_cell(1, 0), LiteralValue::Empty);
6313        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6314        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6315        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(5.0));
6316        let after_delete = column_overlay_stats(&sheet, 0, true);
6317        assert_eq!(after_delete.sparse_fragments, 0);
6318        assert!(after_delete.run_fragments >= 1);
6319        assert_column_overlays_normalized(&sheet, 0);
6320    }
6321
6322    #[test]
6323    fn column_insert_delete_retains_chunk_alignment() {
6324        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
6325        for _ in 0..5 {
6326            b.append_row(&[
6327                LiteralValue::Empty,
6328                LiteralValue::Empty,
6329                LiteralValue::Empty,
6330            ])
6331            .unwrap();
6332        }
6333        let mut sheet = b.finish();
6334        // Record reference chunk lengths of first column
6335        let ref_lens: Vec<usize> = sheet.columns[0]
6336            .chunks
6337            .iter()
6338            .map(|ch| ch.type_tag.len())
6339            .collect();
6340        // Insert 2 columns before index 1
6341        sheet.insert_columns(1, 2);
6342        assert_eq!(sheet.columns.len(), 5);
6343        for col in &sheet.columns {
6344            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6345            assert_eq!(lens, ref_lens);
6346        }
6347        let starts_before = sheet.chunk_starts.clone();
6348        // Delete 2 columns starting at index 2 → back to 3 columns
6349        sheet.delete_columns(2, 2);
6350        assert_eq!(sheet.columns.len(), 3);
6351        for col in &sheet.columns {
6352            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6353            assert_eq!(lens, ref_lens);
6354        }
6355        // chunk_starts unchanged by column operations
6356        assert_eq!(sheet.chunk_starts, starts_before);
6357    }
6358
6359    #[test]
6360    fn multiple_adjacent_row_ops_overlay_mixed_types() {
6361        use formualizer_common::ExcelErrorKind;
6362        // Two columns to ensure alignment preserved across columns
6363        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
6364        for _ in 0..9 {
6365            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
6366                .unwrap();
6367        }
6368        let mut sheet = b.finish();
6369        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
6370        // Column 0 only
6371        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
6372            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
6373            let _ = sh.columns[0].chunks[ch_i].overlay.set(off, ov);
6374        };
6375        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
6376        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
6377        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
6378        set_ov(
6379            &mut sheet,
6380            6,
6381            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
6382        );
6383        set_ov(&mut sheet, 8, OverlayValue::Empty);
6384
6385        // Insert 1 row before index 3
6386        sheet.insert_rows(3, 1);
6387        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
6388        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6389        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
6390        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
6391        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
6392        match av1.get_cell(7, 0) {
6393            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6394            other => panic!("expected error at row 7, got {other:?}"),
6395        }
6396        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
6397
6398        // Insert 2 rows before index 4 (adjacent to previous region)
6399        sheet.insert_rows(4, 2);
6400        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
6401        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6402        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
6403        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
6404        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
6405        match av2.get_cell(9, 0) {
6406            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6407            other => panic!("expected error at row 9, got {other:?}"),
6408        }
6409        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
6410
6411        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
6412        sheet.delete_rows(6, 2);
6413        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6414        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
6415        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
6416        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
6417        match av3.get_cell(7, 0) {
6418            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6419            other => panic!("expected error at row 8, got {other:?}"),
6420        }
6421        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
6422
6423        // Alignment checks
6424        let lens0: Vec<usize> = sheet.columns[0]
6425            .chunks
6426            .iter()
6427            .map(|ch| ch.type_tag.len())
6428            .collect();
6429        for col in &sheet.columns {
6430            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6431            assert_eq!(lens, lens0);
6432        }
6433        // chunk_starts monotonically increasing and cover nrows
6434        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
6435        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
6436        let last_len = sheet.columns[0]
6437            .chunks
6438            .last()
6439            .map(|c| c.type_tag.len())
6440            .unwrap_or(0);
6441        assert_eq!(last_start + last_len, sheet.nrows as usize);
6442    }
6443
6444    #[test]
6445    fn multiple_adjacent_column_ops_alignment() {
6446        // Start with 2 columns, chunk_rows=2, rows=5
6447        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6448        for _ in 0..5 {
6449            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
6450                .unwrap();
6451        }
6452        let mut sheet = b.finish();
6453        let ref_lens: Vec<usize> = sheet.columns[0]
6454            .chunks
6455            .iter()
6456            .map(|ch| ch.type_tag.len())
6457            .collect();
6458        // Insert 1 at start, then 2 at index 2 → columns = 5
6459        sheet.insert_columns(0, 1);
6460        sheet.insert_columns(2, 2);
6461        assert_eq!(sheet.columns.len(), 5);
6462        for col in &sheet.columns {
6463            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6464            assert_eq!(lens, ref_lens);
6465        }
6466        let starts_before = sheet.chunk_starts.clone();
6467        // Delete 1 at index 1, then 2 at the end if available
6468        sheet.delete_columns(1, 1);
6469        let remain = sheet.columns.len();
6470        if remain >= 3 {
6471            sheet.delete_columns(remain - 2, 2);
6472        }
6473        for col in &sheet.columns {
6474            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6475            assert_eq!(lens, ref_lens);
6476        }
6477        assert_eq!(sheet.chunk_starts, starts_before);
6478    }
6479
6480    #[test]
6481    fn overlays_on_multiple_columns_row_col_ops() {
6482        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
6483        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
6484        for _ in 0..6 {
6485            b.append_row(&[
6486                LiteralValue::Empty,
6487                LiteralValue::Empty,
6488                LiteralValue::Empty,
6489            ])
6490            .unwrap();
6491        }
6492        let mut sheet = b.finish();
6493        // Overlays at row2 and row3 across columns with different types
6494        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
6495            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
6496            let _ = sh.columns[col].chunks[ch_i].overlay.set(off, ov);
6497        };
6498        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
6499        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
6500        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
6501        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
6502        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
6503        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
6504
6505        // Insert a row at boundary (before row index 3)
6506        sheet.insert_rows(3, 1);
6507        // Now original row>=3 shift down by 1
6508        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
6509        // Row 2 values unchanged
6510        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
6511        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
6512        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
6513        // Row 3 became Empty (inserted)
6514        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6515        // Row 4 holds old row 3 overlays
6516        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
6517        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
6518        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
6519
6520        // Delete column 1 (middle), values shift left
6521        sheet.delete_columns(1, 1);
6522        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
6523        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
6524        // Column 1 now was old column 2
6525        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
6526        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
6527        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
6528
6529        // Alignment preserved
6530        let lens0: Vec<usize> = sheet.columns[0]
6531            .chunks
6532            .iter()
6533            .map(|ch| ch.type_tag.len())
6534            .collect();
6535        for col in &sheet.columns {
6536            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6537            assert_eq!(lens, lens0);
6538        }
6539    }
6540
6541    #[test]
6542    fn effective_slices_overlay_precedence_numbers_text() {
6543        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
6544        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
6545        for i in 0..6 {
6546            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
6547                .unwrap();
6548        }
6549        let mut sheet = b.finish();
6550        // Overlays: row1 -> Text("X"), row4 -> Number(99)
6551        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
6552        sheet.columns[0].chunks[c1]
6553            .overlay
6554            .set(o1, OverlayValue::Text(Arc::from("X")));
6555        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
6556        sheet.columns[0].chunks[c4]
6557            .overlay
6558            .set(o4, OverlayValue::Number(99.0));
6559
6560        let av = sheet.range_view(0, 0, 5, 0);
6561        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
6562        let mut numeric: Vec<Option<f64>> = vec![None; 6];
6563        for res in av.numbers_slices() {
6564            let (row_start, row_len, cols) = res.unwrap();
6565            let a = &cols[0];
6566            for i in 0..row_len {
6567                let idx = row_start + i;
6568                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6569            }
6570        }
6571        assert_eq!(numeric[0], Some(1.0));
6572        assert_eq!(numeric[1], None); // overshadowed by text overlay
6573        assert_eq!(numeric[2], Some(3.0));
6574        assert_eq!(numeric[3], Some(4.0));
6575        assert_eq!(numeric[4], Some(99.0));
6576        assert_eq!(numeric[5], Some(6.0));
6577
6578        // Validate text_slices: row1 has "X", others null
6579        let mut texts: Vec<Option<String>> = vec![None; 6];
6580        for res in av.text_slices() {
6581            let (row_start, row_len, cols) = res.unwrap();
6582            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
6583            for i in 0..row_len {
6584                let idx = row_start + i;
6585                texts[idx] = if a.is_null(i) {
6586                    None
6587                } else {
6588                    Some(a.value(i).to_string())
6589                };
6590            }
6591        }
6592        assert_eq!(texts[1].as_deref(), Some("X"));
6593        assert!(texts[0].is_none());
6594        assert!(texts[2].is_none());
6595        assert!(texts[3].is_none());
6596        assert!(texts[4].is_none());
6597        assert!(texts[5].is_none());
6598    }
6599
6600    #[test]
6601    fn effective_slices_overlay_precedence_booleans() {
6602        // Base booleans over 1 column; overlays include boolean and non-boolean types.
6603        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6604        for i in 0..6 {
6605            let v = if i % 2 == 0 {
6606                LiteralValue::Boolean(true)
6607            } else {
6608                LiteralValue::Boolean(false)
6609            };
6610            b.append_row(&[v]).unwrap();
6611        }
6612        let mut sheet = b.finish();
6613        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
6614        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
6615        sheet.columns[0].chunks[c1]
6616            .overlay
6617            .set(o1, OverlayValue::Boolean(true));
6618        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
6619        sheet.columns[0].chunks[c2]
6620            .overlay
6621            .set(o2, OverlayValue::Text(Arc::from("T")));
6622
6623        let av = sheet.range_view(0, 0, 5, 0);
6624        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
6625        let mut bools: Vec<Option<bool>> = vec![None; 6];
6626        for res in av.booleans_slices() {
6627            let (row_start, row_len, cols) = res.unwrap();
6628            let a = &cols[0];
6629            for i in 0..row_len {
6630                let idx = row_start + i;
6631                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6632            }
6633        }
6634        assert_eq!(bools[0], Some(true));
6635        assert_eq!(bools[1], Some(true)); // overlay to true
6636        assert_eq!(bools[2], None); // overshadowed by text overlay
6637        // spot-check others remain base
6638        assert_eq!(bools[3], Some(false));
6639    }
6640
6641    #[test]
6642    fn effective_slices_overlay_precedence_errors() {
6643        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
6644        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
6645        for i in 0..6 {
6646            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
6647                .unwrap();
6648        }
6649        let mut sheet = b.finish();
6650        // Overlay error at row 4
6651        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
6652        sheet.columns[0].chunks[c4]
6653            .overlay
6654            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
6655
6656        let av = sheet.range_view(0, 0, 5, 0);
6657        let mut errs: Vec<Option<u8>> = vec![None; 6];
6658        for res in av.errors_slices() {
6659            let (row_start, row_len, cols) = res.unwrap();
6660            let a = &cols[0];
6661            for i in 0..row_len {
6662                let idx = row_start + i;
6663                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6664            }
6665        }
6666        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
6667        assert!(errs[3].is_none());
6668    }
6669}