Skip to main content

formualizer_eval/arrow_store/
mod.rs

1use arrow_array::Array;
2use arrow_array::new_null_array;
3use arrow_schema::DataType;
4use chrono::Timelike;
5use std::sync::Arc;
6
7use arrow_array::builder::{BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder};
8use arrow_array::{ArrayRef, BooleanArray, Float64Array, StringArray, UInt8Array, UInt32Array};
9use once_cell::sync::OnceCell;
10
11use formualizer_common::{ExcelError, ExcelErrorKind, LiteralValue};
12use rustc_hash::FxHashMap;
13use std::collections::{BTreeMap, HashMap};
14
15/// Compact type tag per row (UInt8 backing)
16#[repr(u8)]
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18pub enum TypeTag {
19    Empty = 0,
20    Number = 1,
21    Boolean = 2,
22    Text = 3,
23    Error = 4,
24    DateTime = 5, // reserved for future temporal lanes
25    Duration = 6, // reserved
26    Pending = 7,
27}
28
29impl TypeTag {
30    fn from_value(v: &LiteralValue) -> Self {
31        match v {
32            LiteralValue::Empty => TypeTag::Empty,
33            LiteralValue::Int(_) | LiteralValue::Number(_) => TypeTag::Number,
34            LiteralValue::Boolean(_) => TypeTag::Boolean,
35            LiteralValue::Text(_) => TypeTag::Text,
36            LiteralValue::Error(_) => TypeTag::Error,
37            LiteralValue::Date(_) | LiteralValue::DateTime(_) | LiteralValue::Time(_) => {
38                TypeTag::DateTime
39            }
40            LiteralValue::Duration(_) => TypeTag::Duration,
41            LiteralValue::Pending => TypeTag::Pending,
42            LiteralValue::Array(_) => TypeTag::Error, // arrays not storable in a single cell lane
43        }
44    }
45}
46
47impl TypeTag {
48    #[inline]
49    pub fn from_u8(b: u8) -> Self {
50        match b {
51            x if x == TypeTag::Empty as u8 => TypeTag::Empty,
52            x if x == TypeTag::Number as u8 => TypeTag::Number,
53            x if x == TypeTag::Boolean as u8 => TypeTag::Boolean,
54            x if x == TypeTag::Text as u8 => TypeTag::Text,
55            x if x == TypeTag::Error as u8 => TypeTag::Error,
56            x if x == TypeTag::DateTime as u8 => TypeTag::DateTime,
57            x if x == TypeTag::Duration as u8 => TypeTag::Duration,
58            x if x == TypeTag::Pending as u8 => TypeTag::Pending,
59            _ => TypeTag::Empty,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, Default)]
65pub struct ColumnChunkMeta {
66    pub len: usize,
67    pub non_null_num: usize,
68    pub non_null_bool: usize,
69    pub non_null_text: usize,
70    pub non_null_err: usize,
71}
72
73#[derive(Debug, Clone)]
74pub struct ColumnChunk {
75    pub numbers: Option<Arc<Float64Array>>,
76    pub booleans: Option<Arc<BooleanArray>>,
77    pub text: Option<ArrayRef>,          // Utf8 for Phase A
78    pub errors: Option<Arc<UInt8Array>>, // compact error code (UInt8)
79    pub type_tag: Arc<UInt8Array>,
80    pub formula_id: Option<Arc<UInt32Array>>, // reserved for Phase A+
81    pub meta: ColumnChunkMeta,
82    // Lazy null providers (per-chunk)
83    lazy_null_numbers: OnceCell<Arc<Float64Array>>,
84    lazy_null_booleans: OnceCell<Arc<BooleanArray>>,
85    lazy_null_text: OnceCell<ArrayRef>,
86    lazy_null_errors: OnceCell<Arc<UInt8Array>>,
87    // Cache: lowered text lane, nulls preserved
88    lowered_text: OnceCell<ArrayRef>,
89    // Phase C: per-chunk overlay (delta edits since last compaction)
90    pub overlay: Overlay,
91    // Phase 0/1: separate computed overlay (formula/spill outputs)
92    pub computed_overlay: Overlay,
93}
94
95impl ColumnChunk {
96    #[inline]
97    pub fn len(&self) -> usize {
98        self.type_tag.len()
99    }
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104    #[inline]
105    pub fn numbers_or_null(&self) -> Arc<Float64Array> {
106        if let Some(a) = &self.numbers {
107            return a.clone();
108        }
109        self.lazy_null_numbers
110            .get_or_init(|| {
111                let arr = new_null_array(&DataType::Float64, self.len());
112                Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
113            })
114            .clone()
115    }
116    #[inline]
117    pub fn booleans_or_null(&self) -> Arc<BooleanArray> {
118        if let Some(a) = &self.booleans {
119            return a.clone();
120        }
121        self.lazy_null_booleans
122            .get_or_init(|| {
123                let arr = new_null_array(&DataType::Boolean, self.len());
124                Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
125            })
126            .clone()
127    }
128    #[inline]
129    pub fn errors_or_null(&self) -> Arc<UInt8Array> {
130        if let Some(a) = &self.errors {
131            return a.clone();
132        }
133        self.lazy_null_errors
134            .get_or_init(|| {
135                let arr = new_null_array(&DataType::UInt8, self.len());
136                Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
137            })
138            .clone()
139    }
140    #[inline]
141    pub fn text_or_null(&self) -> ArrayRef {
142        if let Some(a) = &self.text {
143            return a.clone();
144        }
145        self.lazy_null_text
146            .get_or_init(|| new_null_array(&DataType::Utf8, self.len()))
147            .clone()
148    }
149
150    /// Lowercased text lane, with nulls preserved. Cached per chunk.
151    pub fn text_lower_or_null(&self) -> ArrayRef {
152        if let Some(a) = self.lowered_text.get() {
153            return a.clone();
154        }
155        // Lowercase when text present; else return null Utf8
156        let out: ArrayRef = if let Some(txt) = &self.text {
157            let sa = txt.as_any().downcast_ref::<StringArray>().unwrap();
158            let mut b = arrow_array::builder::StringBuilder::with_capacity(sa.len(), sa.len() * 8);
159            for i in 0..sa.len() {
160                if sa.is_null(i) {
161                    b.append_null();
162                } else {
163                    b.append_value(sa.value(i).to_lowercase());
164                }
165            }
166            let lowered = b.finish();
167            Arc::new(lowered)
168        } else {
169            new_null_array(&DataType::Utf8, self.len())
170        };
171        self.lowered_text.get_or_init(|| out.clone());
172        out
173    }
174
175    /// Grow this chunk's logical length to `new_len` (padding with empty/null values).
176    ///
177    /// This is used to keep already-materialized chunks consistent when `ArrowSheet::nrows`
178    /// grows incrementally inside the current last chunk.
179    pub fn grow_len_to(&mut self, new_len: usize) {
180        let old_len = self.len();
181        if new_len <= old_len {
182            return;
183        }
184
185        // Grow type tags (pad with Empty).
186        let mut tags: Vec<u8> = self.type_tag.values().to_vec();
187        tags.resize(new_len, TypeTag::Empty as u8);
188        self.type_tag = Arc::new(UInt8Array::from(tags));
189
190        // Grow lanes when present; append nulls for new rows.
191        if let Some(a) = &self.numbers {
192            use arrow_array::builder::Float64Builder;
193            let mut b = Float64Builder::with_capacity(new_len);
194            for i in 0..old_len {
195                if a.is_null(i) {
196                    b.append_null();
197                } else {
198                    b.append_value(a.value(i));
199                }
200            }
201            for _ in old_len..new_len {
202                b.append_null();
203            }
204            self.numbers = Some(Arc::new(b.finish()));
205        }
206        if let Some(a) = &self.booleans {
207            use arrow_array::builder::BooleanBuilder;
208            let mut b = BooleanBuilder::with_capacity(new_len);
209            for i in 0..old_len {
210                if a.is_null(i) {
211                    b.append_null();
212                } else {
213                    b.append_value(a.value(i));
214                }
215            }
216            for _ in old_len..new_len {
217                b.append_null();
218            }
219            self.booleans = Some(Arc::new(b.finish()));
220        }
221        if let Some(a) = &self.errors {
222            use arrow_array::builder::UInt8Builder;
223            let mut b = UInt8Builder::with_capacity(new_len);
224            for i in 0..old_len {
225                if a.is_null(i) {
226                    b.append_null();
227                } else {
228                    b.append_value(a.value(i));
229                }
230            }
231            for _ in old_len..new_len {
232                b.append_null();
233            }
234            self.errors = Some(Arc::new(b.finish()));
235        }
236        if let Some(a) = &self.text {
237            use arrow_array::builder::StringBuilder;
238            let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
239            let mut b = StringBuilder::with_capacity(new_len, 0);
240            for i in 0..old_len {
241                if sa.is_null(i) {
242                    b.append_null();
243                } else {
244                    b.append_value(sa.value(i));
245                }
246            }
247            for _ in old_len..new_len {
248                b.append_null();
249            }
250            self.text = Some(Arc::new(b.finish()) as ArrayRef);
251        }
252
253        // Length-dependent caches must be dropped.
254        self.lazy_null_numbers = OnceCell::new();
255        self.lazy_null_booleans = OnceCell::new();
256        self.lazy_null_text = OnceCell::new();
257        self.lazy_null_errors = OnceCell::new();
258        self.lowered_text = OnceCell::new();
259
260        self.meta.len = new_len;
261    }
262}
263
264#[derive(Debug, Clone)]
265pub struct ArrowColumn {
266    pub chunks: Vec<ColumnChunk>,
267    pub sparse_chunks: FxHashMap<usize, ColumnChunk>,
268    pub index: u32,
269}
270
271impl ArrowColumn {
272    #[inline]
273    pub fn chunk(&self, idx: usize) -> Option<&ColumnChunk> {
274        if idx < self.chunks.len() {
275            Some(&self.chunks[idx])
276        } else {
277            self.sparse_chunks.get(&idx)
278        }
279    }
280
281    #[inline]
282    pub fn chunk_mut(&mut self, idx: usize) -> Option<&mut ColumnChunk> {
283        if idx < self.chunks.len() {
284            Some(&mut self.chunks[idx])
285        } else {
286            self.sparse_chunks.get_mut(&idx)
287        }
288    }
289
290    #[inline]
291    pub fn has_sparse_chunks(&self) -> bool {
292        !self.sparse_chunks.is_empty()
293    }
294
295    #[inline]
296    pub fn total_chunk_count(&self) -> usize {
297        self.chunks.len() + self.sparse_chunks.len()
298    }
299}
300
301#[derive(Debug, Clone)]
302pub struct ArrowSheet {
303    pub name: Arc<str>,
304    pub columns: Vec<ArrowColumn>,
305    pub nrows: u32,
306    pub chunk_starts: Vec<usize>,
307    /// Preferred chunk size (rows) for capacity growth operations.
308    ///
309    /// For Arrow-ingested sheets this matches the ingest `chunk_rows`. For sparse/overlay-created
310    /// sheets this defaults to 32k to avoid creating thousands of tiny chunks during growth.
311    pub chunk_rows: usize,
312}
313
314#[derive(Debug, Default, Clone)]
315pub struct SheetStore {
316    pub sheets: Vec<ArrowSheet>,
317}
318
319impl SheetStore {
320    pub fn sheet(&self, name: &str) -> Option<&ArrowSheet> {
321        self.sheets.iter().find(|s| s.name.as_ref() == name)
322    }
323    pub fn sheet_mut(&mut self, name: &str) -> Option<&mut ArrowSheet> {
324        self.sheets.iter_mut().find(|s| s.name.as_ref() == name)
325    }
326}
327
328/// Ingestion builder that writes per-column Arrow arrays with a lane/tag design.
329pub struct IngestBuilder {
330    name: Arc<str>,
331    ncols: usize,
332    chunk_rows: usize,
333    date_system: crate::engine::DateSystem,
334
335    // Per-column active builders for current chunk
336    num_builders: Vec<Float64Builder>,
337    bool_builders: Vec<BooleanBuilder>,
338    text_builders: Vec<StringBuilder>,
339    err_builders: Vec<UInt8Builder>,
340    tag_builders: Vec<UInt8Builder>,
341
342    // Per-column per-lane non-null counters for current chunk
343    lane_counts: Vec<LaneCounts>,
344
345    // Accumulated chunks
346    chunks: Vec<Vec<ColumnChunk>>, // indexed by col
347    row_in_chunk: usize,
348    total_rows: u32,
349}
350
351#[derive(Debug, Clone, Copy, Default)]
352struct LaneCounts {
353    n_num: usize,
354    n_bool: usize,
355    n_text: usize,
356    n_err: usize,
357}
358
359impl IngestBuilder {
360    pub fn new(
361        sheet_name: &str,
362        ncols: usize,
363        chunk_rows: usize,
364        date_system: crate::engine::DateSystem,
365    ) -> Self {
366        let mut chunks = Vec::with_capacity(ncols);
367        chunks.resize_with(ncols, Vec::new);
368        Self {
369            name: Arc::from(sheet_name.to_string()),
370            ncols,
371            chunk_rows: chunk_rows.max(1),
372            date_system,
373            num_builders: (0..ncols)
374                .map(|_| Float64Builder::with_capacity(chunk_rows))
375                .collect(),
376            bool_builders: (0..ncols)
377                .map(|_| BooleanBuilder::with_capacity(chunk_rows))
378                .collect(),
379            text_builders: (0..ncols)
380                .map(|_| StringBuilder::with_capacity(chunk_rows, chunk_rows * 12))
381                .collect(),
382            err_builders: (0..ncols)
383                .map(|_| UInt8Builder::with_capacity(chunk_rows))
384                .collect(),
385            tag_builders: (0..ncols)
386                .map(|_| UInt8Builder::with_capacity(chunk_rows))
387                .collect(),
388            lane_counts: vec![LaneCounts::default(); ncols],
389            chunks,
390            row_in_chunk: 0,
391            total_rows: 0,
392        }
393    }
394
395    /// Zero-allocation row append from typed cell tokens (no LiteralValue).
396    /// Text borrows are copied into the internal StringBuilder.
397    pub fn append_row_cells<'a>(&mut self, row: &[CellIngest<'a>]) -> Result<(), ExcelError> {
398        assert_eq!(row.len(), self.ncols, "row width mismatch");
399        for (c, cell) in row.iter().enumerate() {
400            match cell {
401                CellIngest::Empty => {
402                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
403                    self.num_builders[c].append_null();
404                    self.bool_builders[c].append_null();
405                    self.text_builders[c].append_null();
406                    self.err_builders[c].append_null();
407                }
408                CellIngest::Number(n) => {
409                    self.tag_builders[c].append_value(TypeTag::Number as u8);
410                    self.num_builders[c].append_value(*n);
411                    self.lane_counts[c].n_num += 1;
412                    self.bool_builders[c].append_null();
413                    self.text_builders[c].append_null();
414                    self.err_builders[c].append_null();
415                }
416                CellIngest::Boolean(b) => {
417                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
418                    self.num_builders[c].append_null();
419                    self.bool_builders[c].append_value(*b);
420                    self.lane_counts[c].n_bool += 1;
421                    self.text_builders[c].append_null();
422                    self.err_builders[c].append_null();
423                }
424                CellIngest::Text(s) => {
425                    self.tag_builders[c].append_value(TypeTag::Text as u8);
426                    self.num_builders[c].append_null();
427                    self.bool_builders[c].append_null();
428                    self.text_builders[c].append_value(s);
429                    self.lane_counts[c].n_text += 1;
430                    self.err_builders[c].append_null();
431                }
432                CellIngest::ErrorCode(code) => {
433                    self.tag_builders[c].append_value(TypeTag::Error as u8);
434                    self.num_builders[c].append_null();
435                    self.bool_builders[c].append_null();
436                    self.text_builders[c].append_null();
437                    self.err_builders[c].append_value(*code);
438                    self.lane_counts[c].n_err += 1;
439                }
440                CellIngest::DateSerial(serial) => {
441                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
442                    self.num_builders[c].append_value(*serial);
443                    self.lane_counts[c].n_num += 1;
444                    self.bool_builders[c].append_null();
445                    self.text_builders[c].append_null();
446                    self.err_builders[c].append_null();
447                }
448                CellIngest::Pending => {
449                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
450                    self.num_builders[c].append_null();
451                    self.bool_builders[c].append_null();
452                    self.text_builders[c].append_null();
453                    self.err_builders[c].append_null();
454                }
455            }
456        }
457        self.row_in_chunk += 1;
458        self.total_rows += 1;
459        if self.row_in_chunk >= self.chunk_rows {
460            self.finish_chunk();
461        }
462        Ok(())
463    }
464
465    /// Streaming row append from an iterator of typed cell tokens.
466    /// Requires an `ExactSizeIterator` to validate row width without materializing a Vec.
467    pub fn append_row_cells_iter<'a, I>(&mut self, iter: I) -> Result<(), ExcelError>
468    where
469        I: ExactSizeIterator<Item = CellIngest<'a>>,
470    {
471        assert_eq!(iter.len(), self.ncols, "row width mismatch");
472        for (c, cell) in iter.enumerate() {
473            match cell {
474                CellIngest::Empty => {
475                    self.tag_builders[c].append_value(TypeTag::Empty as u8);
476                    self.num_builders[c].append_null();
477                    self.bool_builders[c].append_null();
478                    self.text_builders[c].append_null();
479                    self.err_builders[c].append_null();
480                }
481                CellIngest::Number(n) => {
482                    self.tag_builders[c].append_value(TypeTag::Number as u8);
483                    self.num_builders[c].append_value(n);
484                    self.lane_counts[c].n_num += 1;
485                    self.bool_builders[c].append_null();
486                    self.text_builders[c].append_null();
487                    self.err_builders[c].append_null();
488                }
489                CellIngest::Boolean(b) => {
490                    self.tag_builders[c].append_value(TypeTag::Boolean as u8);
491                    self.num_builders[c].append_null();
492                    self.bool_builders[c].append_value(b);
493                    self.lane_counts[c].n_bool += 1;
494                    self.text_builders[c].append_null();
495                    self.err_builders[c].append_null();
496                }
497                CellIngest::Text(s) => {
498                    self.tag_builders[c].append_value(TypeTag::Text as u8);
499                    self.num_builders[c].append_null();
500                    self.bool_builders[c].append_null();
501                    self.text_builders[c].append_value(s);
502                    self.lane_counts[c].n_text += 1;
503                    self.err_builders[c].append_null();
504                }
505                CellIngest::ErrorCode(code) => {
506                    self.tag_builders[c].append_value(TypeTag::Error as u8);
507                    self.num_builders[c].append_null();
508                    self.bool_builders[c].append_null();
509                    self.text_builders[c].append_null();
510                    self.err_builders[c].append_value(code);
511                    self.lane_counts[c].n_err += 1;
512                }
513                CellIngest::DateSerial(serial) => {
514                    self.tag_builders[c].append_value(TypeTag::DateTime as u8);
515                    self.num_builders[c].append_value(serial);
516                    self.lane_counts[c].n_num += 1;
517                    self.bool_builders[c].append_null();
518                    self.text_builders[c].append_null();
519                    self.err_builders[c].append_null();
520                }
521                CellIngest::Pending => {
522                    self.tag_builders[c].append_value(TypeTag::Pending as u8);
523                    self.num_builders[c].append_null();
524                    self.bool_builders[c].append_null();
525                    self.text_builders[c].append_null();
526                    self.err_builders[c].append_null();
527                }
528            }
529        }
530        self.row_in_chunk += 1;
531        self.total_rows += 1;
532        if self.row_in_chunk >= self.chunk_rows {
533            self.finish_chunk();
534        }
535        Ok(())
536    }
537
538    /// Append a single row of values. Length must match `ncols`.
539    pub fn append_row(&mut self, row: &[LiteralValue]) -> Result<(), ExcelError> {
540        assert_eq!(row.len(), self.ncols, "row width mismatch");
541
542        for (c, v) in row.iter().enumerate() {
543            let tag = TypeTag::from_value(v) as u8;
544            self.tag_builders[c].append_value(tag);
545
546            match v {
547                LiteralValue::Empty => {
548                    self.num_builders[c].append_null();
549                    self.bool_builders[c].append_null();
550                    self.text_builders[c].append_null();
551                    self.err_builders[c].append_null();
552                }
553                LiteralValue::Int(i) => {
554                    self.num_builders[c].append_value(*i as f64);
555                    self.lane_counts[c].n_num += 1;
556                    self.bool_builders[c].append_null();
557                    self.text_builders[c].append_null();
558                    self.err_builders[c].append_null();
559                }
560                LiteralValue::Number(n) => {
561                    self.num_builders[c].append_value(*n);
562                    self.lane_counts[c].n_num += 1;
563                    self.bool_builders[c].append_null();
564                    self.text_builders[c].append_null();
565                    self.err_builders[c].append_null();
566                }
567                LiteralValue::Boolean(b) => {
568                    self.num_builders[c].append_null();
569                    self.bool_builders[c].append_value(*b);
570                    self.lane_counts[c].n_bool += 1;
571                    self.text_builders[c].append_null();
572                    self.err_builders[c].append_null();
573                }
574                LiteralValue::Text(s) => {
575                    self.num_builders[c].append_null();
576                    self.bool_builders[c].append_null();
577                    self.text_builders[c].append_value(s);
578                    self.lane_counts[c].n_text += 1;
579                    self.err_builders[c].append_null();
580                }
581                LiteralValue::Error(e) => {
582                    self.num_builders[c].append_null();
583                    self.bool_builders[c].append_null();
584                    self.text_builders[c].append_null();
585                    self.err_builders[c].append_value(map_error_code(e.kind));
586                    self.lane_counts[c].n_err += 1;
587                }
588                // Phase A: coerce temporal to serials in numeric lane with DateTime tag
589                LiteralValue::Date(d) => {
590                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
591                    let serial =
592                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, &dt);
593                    self.num_builders[c].append_value(serial);
594                    self.lane_counts[c].n_num += 1;
595                    self.bool_builders[c].append_null();
596                    self.text_builders[c].append_null();
597                    self.err_builders[c].append_null();
598                }
599                LiteralValue::DateTime(dt) => {
600                    let serial =
601                        crate::builtins::datetime::datetime_to_serial_for(self.date_system, dt);
602                    self.num_builders[c].append_value(serial);
603                    self.lane_counts[c].n_num += 1;
604                    self.bool_builders[c].append_null();
605                    self.text_builders[c].append_null();
606                    self.err_builders[c].append_null();
607                }
608                LiteralValue::Time(t) => {
609                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
610                    self.num_builders[c].append_value(serial);
611                    self.lane_counts[c].n_num += 1;
612                    self.bool_builders[c].append_null();
613                    self.text_builders[c].append_null();
614                    self.err_builders[c].append_null();
615                }
616                LiteralValue::Duration(dur) => {
617                    let serial = dur.num_seconds() as f64 / 86_400.0;
618                    self.num_builders[c].append_value(serial);
619                    self.lane_counts[c].n_num += 1;
620                    self.bool_builders[c].append_null();
621                    self.text_builders[c].append_null();
622                    self.err_builders[c].append_null();
623                }
624                LiteralValue::Array(_) => {
625                    // Not allowed as a stored scalar; mark as error kind VALUE
626                    self.num_builders[c].append_null();
627                    self.bool_builders[c].append_null();
628                    self.text_builders[c].append_null();
629                    self.err_builders[c].append_value(map_error_code(ExcelErrorKind::Value));
630                    self.lane_counts[c].n_err += 1;
631                }
632                LiteralValue::Pending => {
633                    // Pending: tag only; all lanes remain null (no error)
634                    self.num_builders[c].append_null();
635                    self.bool_builders[c].append_null();
636                    self.text_builders[c].append_null();
637                    self.err_builders[c].append_null();
638                }
639            }
640        }
641
642        self.row_in_chunk += 1;
643        self.total_rows += 1;
644
645        if self.row_in_chunk >= self.chunk_rows {
646            self.finish_chunk();
647        }
648
649        Ok(())
650    }
651
652    fn finish_chunk(&mut self) {
653        if self.row_in_chunk == 0 {
654            return;
655        }
656        for c in 0..self.ncols {
657            let len = self.row_in_chunk;
658            let numbers_arc: Option<Arc<Float64Array>> = if self.lane_counts[c].n_num == 0 {
659                None
660            } else {
661                Some(Arc::new(self.num_builders[c].finish()))
662            };
663            let booleans_arc: Option<Arc<BooleanArray>> = if self.lane_counts[c].n_bool == 0 {
664                None
665            } else {
666                Some(Arc::new(self.bool_builders[c].finish()))
667            };
668            let text_ref: Option<ArrayRef> = if self.lane_counts[c].n_text == 0 {
669                None
670            } else {
671                Some(Arc::new(self.text_builders[c].finish()))
672            };
673            let errors_arc: Option<Arc<UInt8Array>> = if self.lane_counts[c].n_err == 0 {
674                None
675            } else {
676                Some(Arc::new(self.err_builders[c].finish()))
677            };
678            let tags: UInt8Array = self.tag_builders[c].finish();
679
680            let chunk = ColumnChunk {
681                numbers: numbers_arc,
682                booleans: booleans_arc,
683                text: text_ref,
684                errors: errors_arc,
685                type_tag: Arc::new(tags),
686                formula_id: None,
687                meta: ColumnChunkMeta {
688                    len,
689                    non_null_num: self.lane_counts[c].n_num,
690                    non_null_bool: self.lane_counts[c].n_bool,
691                    non_null_text: self.lane_counts[c].n_text,
692                    non_null_err: self.lane_counts[c].n_err,
693                },
694                lazy_null_numbers: OnceCell::new(),
695                lazy_null_booleans: OnceCell::new(),
696                lazy_null_text: OnceCell::new(),
697                lazy_null_errors: OnceCell::new(),
698                lowered_text: OnceCell::new(),
699                overlay: Overlay::new(),
700                computed_overlay: Overlay::new(),
701            };
702            self.chunks[c].push(chunk);
703
704            // re-init builders for next chunk
705            self.num_builders[c] = Float64Builder::with_capacity(self.chunk_rows);
706            self.bool_builders[c] = BooleanBuilder::with_capacity(self.chunk_rows);
707            self.text_builders[c] =
708                StringBuilder::with_capacity(self.chunk_rows, self.chunk_rows * 12);
709            self.err_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
710            self.tag_builders[c] = UInt8Builder::with_capacity(self.chunk_rows);
711            self.lane_counts[c] = LaneCounts::default();
712        }
713        self.row_in_chunk = 0;
714    }
715
716    pub fn finish(mut self) -> ArrowSheet {
717        // flush partial chunk
718        if self.row_in_chunk > 0 {
719            self.finish_chunk();
720        }
721
722        let mut columns = Vec::with_capacity(self.ncols);
723        for (idx, chunks) in self.chunks.into_iter().enumerate() {
724            columns.push(ArrowColumn {
725                chunks,
726                sparse_chunks: FxHashMap::default(),
727                index: idx as u32,
728            });
729        }
730        // Precompute chunk starts from first column and enforce alignment across columns
731        let mut chunk_starts: Vec<usize> = Vec::new();
732        if let Some(col0) = columns.first() {
733            let chunks_len0 = col0.chunks.len();
734            for (ci, col) in columns.iter().enumerate() {
735                if col.chunks.len() != chunks_len0 {
736                    panic!(
737                        "ArrowSheet chunk misalignment: column {} chunks={} != {}",
738                        ci,
739                        col.chunks.len(),
740                        chunks_len0
741                    );
742                }
743            }
744            let mut cur = 0usize;
745            for i in 0..chunks_len0 {
746                let len_i = col0.chunks[i].type_tag.len();
747                for (ci, col) in columns.iter().enumerate() {
748                    let got = col.chunks[i].type_tag.len();
749                    if got != len_i {
750                        panic!(
751                            "ArrowSheet chunk row-length misalignment at chunk {i}: col {ci} len={got} != {len_i}"
752                        );
753                    }
754                }
755                chunk_starts.push(cur);
756                cur += len_i;
757            }
758        }
759        ArrowSheet {
760            name: self.name,
761            columns,
762            nrows: self.total_rows,
763            chunk_starts,
764            chunk_rows: self.chunk_rows,
765        }
766    }
767}
768
769pub fn map_error_code(kind: ExcelErrorKind) -> u8 {
770    match kind {
771        ExcelErrorKind::Null => 1,
772        ExcelErrorKind::Ref => 2,
773        ExcelErrorKind::Name => 3,
774        ExcelErrorKind::Value => 4,
775        ExcelErrorKind::Div => 5,
776        ExcelErrorKind::Na => 6,
777        ExcelErrorKind::Num => 7,
778        ExcelErrorKind::Error => 8,
779        ExcelErrorKind::NImpl => 9,
780        ExcelErrorKind::Spill => 10,
781        ExcelErrorKind::Calc => 11,
782        ExcelErrorKind::Circ => 12,
783        ExcelErrorKind::Cancelled => 13,
784    }
785}
786
787pub fn unmap_error_code(code: u8) -> ExcelErrorKind {
788    match code {
789        1 => ExcelErrorKind::Null,
790        2 => ExcelErrorKind::Ref,
791        3 => ExcelErrorKind::Name,
792        4 => ExcelErrorKind::Value,
793        5 => ExcelErrorKind::Div,
794        6 => ExcelErrorKind::Na,
795        7 => ExcelErrorKind::Num,
796        8 => ExcelErrorKind::Error,
797        9 => ExcelErrorKind::NImpl,
798        10 => ExcelErrorKind::Spill,
799        11 => ExcelErrorKind::Calc,
800        12 => ExcelErrorKind::Circ,
801        13 => ExcelErrorKind::Cancelled,
802        _ => ExcelErrorKind::Error,
803    }
804}
805
806// ─────────────────────────── Overlay (Phase C) ────────────────────────────
807
808/// Zero-allocation cell token for ingestion.
809pub enum CellIngest<'a> {
810    Empty,
811    Number(f64),
812    Boolean(bool),
813    Text(&'a str),
814    ErrorCode(u8),
815    DateSerial(f64),
816    Pending,
817}
818
819#[derive(Debug, Clone, PartialEq)]
820pub enum OverlayValue {
821    Empty,
822    Number(f64),
823    /// Date/Time/DateTime stored as an Excel serial in the numeric lane.
824    DateTime(f64),
825    /// Duration stored as an Excel-style day-fraction in the numeric lane.
826    Duration(f64),
827    Boolean(bool),
828    Text(Arc<str>),
829    Error(u8),
830    Pending,
831}
832
833impl OverlayValue {
834    pub fn from_literal_value(
835        value: &LiteralValue,
836        date_system: crate::engine::DateSystem,
837    ) -> Self {
838        match value {
839            LiteralValue::Empty => OverlayValue::Empty,
840            LiteralValue::Int(i) => OverlayValue::Number(*i as f64),
841            LiteralValue::Number(n) => OverlayValue::Number(*n),
842            LiteralValue::Boolean(b) => OverlayValue::Boolean(*b),
843            LiteralValue::Text(s) => OverlayValue::Text(Arc::from(s.clone())),
844            LiteralValue::Error(e) => OverlayValue::Error(map_error_code(e.kind)),
845            LiteralValue::Date(d) => {
846                let dt = d.and_hms_opt(0, 0, 0).unwrap();
847                OverlayValue::DateTime(crate::builtins::datetime::datetime_to_serial_for(
848                    date_system,
849                    &dt,
850                ))
851            }
852            LiteralValue::DateTime(dt) => OverlayValue::DateTime(
853                crate::builtins::datetime::datetime_to_serial_for(date_system, dt),
854            ),
855            LiteralValue::Time(t) => {
856                OverlayValue::DateTime(t.num_seconds_from_midnight() as f64 / 86_400.0)
857            }
858            LiteralValue::Duration(d) => OverlayValue::Duration(d.num_seconds() as f64 / 86_400.0),
859            LiteralValue::Pending => OverlayValue::Pending,
860            LiteralValue::Array(_) => OverlayValue::Error(map_error_code(ExcelErrorKind::Value)),
861        }
862    }
863
864    #[inline]
865    pub(crate) fn estimated_payload_bytes(&self) -> usize {
866        match self {
867            OverlayValue::Empty | OverlayValue::Pending => 0,
868            OverlayValue::Number(_) | OverlayValue::DateTime(_) | OverlayValue::Duration(_) => {
869                core::mem::size_of::<f64>()
870            }
871            OverlayValue::Boolean(_) => core::mem::size_of::<bool>(),
872            OverlayValue::Error(_) => core::mem::size_of::<u8>(),
873            // Deterministic estimate: count string bytes only.
874            OverlayValue::Text(s) => s.len(),
875        }
876    }
877
878    #[inline]
879    pub(crate) fn type_tag(&self) -> TypeTag {
880        match self {
881            OverlayValue::Empty => TypeTag::Empty,
882            OverlayValue::Number(_) => TypeTag::Number,
883            OverlayValue::DateTime(_) => TypeTag::DateTime,
884            OverlayValue::Duration(_) => TypeTag::Duration,
885            OverlayValue::Boolean(_) => TypeTag::Boolean,
886            OverlayValue::Text(_) => TypeTag::Text,
887            OverlayValue::Error(_) => TypeTag::Error,
888            OverlayValue::Pending => TypeTag::Pending,
889        }
890    }
891
892    #[inline]
893    pub(crate) fn numeric_lane_value(&self) -> Option<f64> {
894        match self {
895            OverlayValue::Number(n) | OverlayValue::DateTime(n) | OverlayValue::Duration(n) => {
896                Some(*n)
897            }
898            _ => None,
899        }
900    }
901
902    #[inline]
903    pub(crate) fn boolean_lane_value(&self) -> Option<bool> {
904        match self {
905            OverlayValue::Boolean(b) => Some(*b),
906            _ => None,
907        }
908    }
909
910    #[inline]
911    pub(crate) fn text_lane_value(&self) -> Option<&str> {
912        match self {
913            OverlayValue::Text(s) => Some(s.as_ref()),
914            _ => None,
915        }
916    }
917
918    #[inline]
919    pub(crate) fn error_lane_value(&self) -> Option<u8> {
920        match self {
921            OverlayValue::Error(code) => Some(*code),
922            _ => None,
923        }
924    }
925
926    pub(crate) fn lowered_text_value(&self) -> Option<String> {
927        match self {
928            OverlayValue::Text(s) => Some(s.to_lowercase()),
929            OverlayValue::Number(n) | OverlayValue::DateTime(n) | OverlayValue::Duration(n) => {
930                Some(n.to_string())
931            }
932            OverlayValue::Boolean(b) => Some(if *b { "true" } else { "false" }.to_string()),
933            OverlayValue::Empty | OverlayValue::Error(_) | OverlayValue::Pending => None,
934        }
935    }
936
937    pub(crate) fn to_literal(&self) -> LiteralValue {
938        match self {
939            OverlayValue::Empty => LiteralValue::Empty,
940            OverlayValue::Number(n) => LiteralValue::Number(*n),
941            OverlayValue::DateTime(serial) => LiteralValue::from_serial_number(*serial),
942            OverlayValue::Duration(serial) => {
943                let nanos_f = *serial * 86_400.0 * 1_000_000_000.0;
944                let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
945                LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
946            }
947            OverlayValue::Boolean(b) => LiteralValue::Boolean(*b),
948            OverlayValue::Text(s) => LiteralValue::Text((**s).to_string()),
949            OverlayValue::Error(code) => {
950                LiteralValue::Error(ExcelError::new(unmap_error_code(*code)))
951            }
952            OverlayValue::Pending => LiteralValue::Pending,
953        }
954    }
955}
956
957#[derive(Debug, Clone)]
958pub(crate) enum OverlayScalar<'a> {
959    Borrowed(&'a OverlayValue),
960    Owned(OverlayValue),
961}
962
963impl<'a> OverlayScalar<'a> {
964    #[inline]
965    fn as_value(&self) -> &OverlayValue {
966        match self {
967            OverlayScalar::Borrowed(value) => value,
968            OverlayScalar::Owned(value) => value,
969        }
970    }
971
972    #[inline]
973    pub(crate) fn to_overlay_value(&self) -> OverlayValue {
974        self.as_value().clone()
975    }
976
977    #[inline]
978    pub(crate) fn type_tag(&self) -> TypeTag {
979        self.as_value().type_tag()
980    }
981
982    #[inline]
983    pub(crate) fn numeric_lane_value(&self) -> Option<f64> {
984        self.as_value().numeric_lane_value()
985    }
986
987    #[inline]
988    pub(crate) fn boolean_lane_value(&self) -> Option<bool> {
989        self.as_value().boolean_lane_value()
990    }
991
992    #[inline]
993    pub(crate) fn text_lane_value(&self) -> Option<&str> {
994        self.as_value().text_lane_value()
995    }
996
997    #[inline]
998    pub(crate) fn error_lane_value(&self) -> Option<u8> {
999        self.as_value().error_lane_value()
1000    }
1001
1002    pub(crate) fn lowered_text_value(&self) -> Option<String> {
1003        self.as_value().lowered_text_value()
1004    }
1005
1006    pub(crate) fn to_literal(&self) -> LiteralValue {
1007        self.as_value().to_literal()
1008    }
1009}
1010
1011const OVERLAY_ENTRY_BASE_BYTES: usize = 32;
1012const OVERLAY_FRAGMENT_BASE_BYTES: usize = 48;
1013
1014#[allow(dead_code)]
1015#[derive(Debug, Clone)]
1016pub(crate) struct OverlayFragmentPayload {
1017    type_tags: Arc<UInt8Array>,
1018    numbers: Option<Arc<Float64Array>>,
1019    booleans: Option<Arc<BooleanArray>>,
1020    text: Option<ArrayRef>,
1021    errors: Option<Arc<UInt8Array>>,
1022    estimated_bytes: usize,
1023}
1024
1025impl OverlayFragmentPayload {
1026    fn from_values(values: Vec<OverlayValue>) -> Self {
1027        let len = values.len();
1028        let mut tag_b = UInt8Builder::with_capacity(len);
1029        let mut nb = Float64Builder::with_capacity(len);
1030        let mut bb = BooleanBuilder::with_capacity(len);
1031        let mut sb = StringBuilder::with_capacity(len, len.saturating_mul(8));
1032        let mut eb = UInt8Builder::with_capacity(len);
1033        let mut non_num = 0usize;
1034        let mut non_bool = 0usize;
1035        let mut non_text = 0usize;
1036        let mut non_err = 0usize;
1037
1038        for value in &values {
1039            append_overlay_value_to_lane_builders(
1040                value,
1041                &mut tag_b,
1042                &mut nb,
1043                &mut bb,
1044                &mut sb,
1045                &mut eb,
1046                &mut non_num,
1047                &mut non_bool,
1048                &mut non_text,
1049                &mut non_err,
1050            );
1051        }
1052
1053        let type_tags = Arc::new(tag_b.finish());
1054        let numbers = {
1055            let a = nb.finish();
1056            (non_num > 0).then(|| Arc::new(a))
1057        };
1058        let booleans = {
1059            let a = bb.finish();
1060            (non_bool > 0).then(|| Arc::new(a))
1061        };
1062        let text = {
1063            let a = sb.finish();
1064            (non_text > 0).then(|| Arc::new(a) as ArrayRef)
1065        };
1066        let errors = {
1067            let a = eb.finish();
1068            (non_err > 0).then(|| Arc::new(a))
1069        };
1070
1071        let estimated_bytes = type_tags
1072            .get_array_memory_size()
1073            .saturating_add(
1074                numbers
1075                    .as_ref()
1076                    .map(|a| a.get_array_memory_size())
1077                    .unwrap_or(0),
1078            )
1079            .saturating_add(
1080                booleans
1081                    .as_ref()
1082                    .map(|a| a.get_array_memory_size())
1083                    .unwrap_or(0),
1084            )
1085            .saturating_add(
1086                text.as_ref()
1087                    .map(|a| a.get_array_memory_size())
1088                    .unwrap_or(0),
1089            )
1090            .saturating_add(
1091                errors
1092                    .as_ref()
1093                    .map(|a| a.get_array_memory_size())
1094                    .unwrap_or(0),
1095            );
1096
1097        Self {
1098            type_tags,
1099            numbers,
1100            booleans,
1101            text,
1102            errors,
1103            estimated_bytes,
1104        }
1105    }
1106
1107    fn overlay_value(&self, idx: usize) -> Option<OverlayValue> {
1108        if idx >= self.type_tags.len() || self.type_tags.is_null(idx) {
1109            return None;
1110        }
1111        match TypeTag::from_u8(self.type_tags.value(idx)) {
1112            TypeTag::Empty => Some(OverlayValue::Empty),
1113            TypeTag::Number => Some(OverlayValue::Number(self.number_at(idx)?)),
1114            TypeTag::DateTime => Some(OverlayValue::DateTime(self.number_at(idx)?)),
1115            TypeTag::Duration => Some(OverlayValue::Duration(self.number_at(idx)?)),
1116            TypeTag::Boolean => Some(OverlayValue::Boolean(self.boolean_at(idx)?)),
1117            TypeTag::Text => Some(OverlayValue::Text(Arc::from(self.text_at(idx)?))),
1118            TypeTag::Error => Some(OverlayValue::Error(self.error_at(idx)?)),
1119            TypeTag::Pending => Some(OverlayValue::Pending),
1120        }
1121    }
1122
1123    #[inline]
1124    fn get_scalar(&self, idx: usize) -> Option<OverlayScalar<'_>> {
1125        self.overlay_value(idx).map(OverlayScalar::Owned)
1126    }
1127
1128    #[inline]
1129    fn number_at(&self, idx: usize) -> Option<f64> {
1130        let arr = self.numbers.as_ref()?;
1131        (!arr.is_null(idx)).then(|| arr.value(idx))
1132    }
1133
1134    #[inline]
1135    fn boolean_at(&self, idx: usize) -> Option<bool> {
1136        let arr = self.booleans.as_ref()?;
1137        (!arr.is_null(idx)).then(|| arr.value(idx))
1138    }
1139
1140    #[inline]
1141    fn text_at(&self, idx: usize) -> Option<&str> {
1142        let arr = self.text.as_ref()?;
1143        let arr = arr.as_any().downcast_ref::<StringArray>()?;
1144        (!arr.is_null(idx)).then(|| arr.value(idx))
1145    }
1146
1147    #[inline]
1148    fn error_at(&self, idx: usize) -> Option<u8> {
1149        let arr = self.errors.as_ref()?;
1150        (!arr.is_null(idx)).then(|| arr.value(idx))
1151    }
1152
1153    #[inline]
1154    fn values_slice(&self, start: usize, len: usize) -> Vec<OverlayValue> {
1155        (start..start.saturating_add(len))
1156            .filter_map(|idx| self.overlay_value(idx))
1157            .collect()
1158    }
1159
1160    #[inline]
1161    fn estimated_bytes(&self) -> usize {
1162        self.estimated_bytes
1163    }
1164}
1165#[derive(Debug, Clone)]
1166pub(crate) enum OverlayFragment {
1167    SparseOffsets {
1168        offsets: Vec<u32>,
1169        payload: OverlayFragmentPayload,
1170    },
1171    DenseRange {
1172        start: u32,
1173        len: u32,
1174        payload: OverlayFragmentPayload,
1175    },
1176    RunRange {
1177        start: u32,
1178        len: u32,
1179        run_ends: Vec<u32>,
1180        payload: OverlayFragmentPayload,
1181    },
1182}
1183
1184impl OverlayFragment {
1185    const MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK: usize = 128;
1186
1187    pub(crate) fn sparse_offsets(items: Vec<(usize, OverlayValue)>) -> Option<Self> {
1188        let mut by_offset: BTreeMap<usize, OverlayValue> = BTreeMap::new();
1189        for (offset, value) in items {
1190            by_offset.insert(offset, value);
1191        }
1192        if by_offset.is_empty() {
1193            return None;
1194        }
1195
1196        let mut offsets = Vec::with_capacity(by_offset.len());
1197        let mut values = Vec::with_capacity(by_offset.len());
1198        for (offset, value) in by_offset {
1199            offsets.push(u32::try_from(offset).expect("overlay offset fits in u32"));
1200            values.push(value);
1201        }
1202
1203        Some(Self::SparseOffsets {
1204            offsets,
1205            payload: OverlayFragmentPayload::from_values(values),
1206        })
1207    }
1208
1209    pub(crate) fn sparse_offsets_if_estimated_smaller_than_points(
1210        items: Vec<(usize, OverlayValue)>,
1211        point_estimate: usize,
1212    ) -> Option<Result<Self, Vec<(usize, OverlayValue)>>> {
1213        let fragment = Self::sparse_offsets(items)?;
1214        if fragment.estimated_bytes() < point_estimate {
1215            Some(Ok(fragment))
1216        } else {
1217            Some(Err(fragment.cells()))
1218        }
1219    }
1220
1221    pub(crate) fn dense_range(start: usize, values: Vec<OverlayValue>) -> Option<Self> {
1222        let len = values.len();
1223        if len == 0 {
1224            return None;
1225        }
1226        Some(Self::DenseRange {
1227            start: u32::try_from(start).expect("overlay start fits in u32"),
1228            len: u32::try_from(len).expect("overlay length fits in u32"),
1229            payload: OverlayFragmentPayload::from_values(values),
1230        })
1231    }
1232
1233    pub(crate) fn run_range(start: usize, values: Vec<OverlayValue>) -> Option<Self> {
1234        if values.is_empty() {
1235            return None;
1236        }
1237
1238        let mut run_ends = Vec::new();
1239        let mut run_values = Vec::new();
1240        let mut current = values[0].clone();
1241        for (idx, value) in values.iter().enumerate().skip(1) {
1242            if *value != current {
1243                run_ends.push(idx);
1244                run_values.push(current);
1245                current = value.clone();
1246            }
1247        }
1248        run_ends.push(values.len());
1249        run_values.push(current);
1250
1251        Self::run_range_from_parts(start, values.len(), run_ends, run_values)
1252    }
1253
1254    fn run_range_from_parts(
1255        start: usize,
1256        len: usize,
1257        run_ends: Vec<usize>,
1258        values: Vec<OverlayValue>,
1259    ) -> Option<Self> {
1260        if len == 0 || run_ends.is_empty() || run_ends.len() != values.len() {
1261            return None;
1262        }
1263
1264        let mut merged_ends: Vec<u32> = Vec::with_capacity(run_ends.len());
1265        let mut merged_values: Vec<OverlayValue> = Vec::with_capacity(values.len());
1266        let mut prev_end = 0usize;
1267        for (end, value) in run_ends.into_iter().zip(values.into_iter()) {
1268            if end <= prev_end || end > len {
1269                return None;
1270            }
1271            if merged_values.last().is_some_and(|last| *last == value) {
1272                if let Some(last_end) = merged_ends.last_mut() {
1273                    *last_end = u32::try_from(end).expect("run end fits in u32");
1274                }
1275            } else {
1276                merged_ends.push(u32::try_from(end).expect("run end fits in u32"));
1277                merged_values.push(value);
1278            }
1279            prev_end = end;
1280        }
1281
1282        if prev_end != len || merged_ends.last().copied() != Some(len as u32) {
1283            return None;
1284        }
1285
1286        Some(Self::RunRange {
1287            start: u32::try_from(start).expect("overlay start fits in u32"),
1288            len: u32::try_from(len).expect("overlay length fits in u32"),
1289            run_ends: merged_ends,
1290            payload: OverlayFragmentPayload::from_values(merged_values),
1291        })
1292    }
1293
1294    #[inline]
1295    fn estimated_bytes(&self) -> usize {
1296        match self {
1297            OverlayFragment::SparseOffsets { offsets, payload } => OVERLAY_FRAGMENT_BASE_BYTES
1298                .saturating_add(offsets.len().saturating_mul(core::mem::size_of::<u32>()))
1299                .saturating_add(payload.estimated_bytes()),
1300            OverlayFragment::DenseRange { payload, .. } => {
1301                OVERLAY_FRAGMENT_BASE_BYTES.saturating_add(payload.estimated_bytes())
1302            }
1303            OverlayFragment::RunRange {
1304                run_ends, payload, ..
1305            } => OVERLAY_FRAGMENT_BASE_BYTES
1306                .saturating_add(run_ends.len().saturating_mul(core::mem::size_of::<u32>()))
1307                .saturating_add(payload.estimated_bytes()),
1308        }
1309    }
1310
1311    #[inline]
1312    fn coverage_len(&self) -> usize {
1313        match self {
1314            OverlayFragment::SparseOffsets { offsets, .. } => offsets.len(),
1315            OverlayFragment::DenseRange { len, .. } | OverlayFragment::RunRange { len, .. } => {
1316                *len as usize
1317            }
1318        }
1319    }
1320
1321    pub(crate) fn max_covered_offset(&self) -> usize {
1322        match self {
1323            OverlayFragment::SparseOffsets { offsets, .. } => {
1324                offsets.iter().copied().max().unwrap_or(0) as usize
1325            }
1326            OverlayFragment::DenseRange { start, len, .. }
1327            | OverlayFragment::RunRange { start, len, .. } => (*start as usize)
1328                .saturating_add(*len as usize)
1329                .saturating_sub(1),
1330        }
1331    }
1332
1333    fn interval_coverage(&self) -> Option<core::ops::Range<usize>> {
1334        match self {
1335            OverlayFragment::DenseRange { start, len, .. }
1336            | OverlayFragment::RunRange { start, len, .. } => {
1337                let start = *start as usize;
1338                Some(start..start.saturating_add(*len as usize))
1339            }
1340            OverlayFragment::SparseOffsets { .. } => None,
1341        }
1342    }
1343
1344    fn sparse_offsets_slice(&self) -> Option<&[u32]> {
1345        match self {
1346            OverlayFragment::SparseOffsets { offsets, .. } => Some(offsets.as_slice()),
1347            _ => None,
1348        }
1349    }
1350
1351    fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
1352        if range.is_empty() {
1353            return false;
1354        }
1355        match self {
1356            OverlayFragment::SparseOffsets { offsets, .. } => {
1357                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1358                let idx = offsets.partition_point(|off| *off < start);
1359                offsets
1360                    .get(idx)
1361                    .is_some_and(|off| (*off as usize) < range.end)
1362            }
1363            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => self
1364                .interval_coverage()
1365                .is_some_and(|r| r.start < range.end && range.start < r.end),
1366        }
1367    }
1368
1369    fn intersects_fragment_exact(&self, replacement: &OverlayFragment) -> bool {
1370        if let Some(offsets) = replacement.sparse_offsets_slice() {
1371            self.intersects_sparse_offsets(offsets)
1372        } else if let Some(range) = replacement.interval_coverage() {
1373            self.intersects_interval(range)
1374        } else {
1375            false
1376        }
1377    }
1378
1379    fn intersects_interval(&self, range: core::ops::Range<usize>) -> bool {
1380        if range.is_empty() {
1381            return false;
1382        }
1383        match self {
1384            OverlayFragment::SparseOffsets { offsets, .. } => {
1385                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1386                let idx = offsets.partition_point(|off| *off < start);
1387                offsets
1388                    .get(idx)
1389                    .is_some_and(|off| (*off as usize) < range.end)
1390            }
1391            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => self
1392                .interval_coverage()
1393                .is_some_and(|own| own.start < range.end && range.start < own.end),
1394        }
1395    }
1396
1397    fn intersects_sparse_offsets(&self, replacement_offsets: &[u32]) -> bool {
1398        if replacement_offsets.is_empty() {
1399            return false;
1400        }
1401        match self {
1402            OverlayFragment::SparseOffsets { offsets, .. } => {
1403                Self::sorted_offsets_intersect(offsets, replacement_offsets)
1404            }
1405            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1406                self.interval_coverage().is_some_and(|range| {
1407                    let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1408                    let idx = replacement_offsets.partition_point(|off| *off < start);
1409                    replacement_offsets
1410                        .get(idx)
1411                        .is_some_and(|off| (*off as usize) < range.end)
1412                })
1413            }
1414        }
1415    }
1416
1417    fn sorted_offsets_intersect(a: &[u32], b: &[u32]) -> bool {
1418        let mut ai = 0usize;
1419        let mut bi = 0usize;
1420        while ai < a.len() && bi < b.len() {
1421            match a[ai].cmp(&b[bi]) {
1422                core::cmp::Ordering::Equal => return true,
1423                core::cmp::Ordering::Less => ai += 1,
1424                core::cmp::Ordering::Greater => bi += 1,
1425            }
1426        }
1427        false
1428    }
1429
1430    fn covers_offset(&self, off: usize) -> bool {
1431        self.get_scalar(off).is_some()
1432    }
1433
1434    fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'_>> {
1435        match self {
1436            OverlayFragment::SparseOffsets { offsets, payload } => {
1437                let off = u32::try_from(off).ok()?;
1438                let idx = offsets.binary_search(&off).ok()?;
1439                payload.get_scalar(idx)
1440            }
1441            OverlayFragment::DenseRange {
1442                start,
1443                len,
1444                payload,
1445            } => {
1446                let start = *start as usize;
1447                let rel = off.checked_sub(start)?;
1448                if rel >= *len as usize {
1449                    return None;
1450                }
1451                payload.get_scalar(rel)
1452            }
1453            OverlayFragment::RunRange {
1454                start,
1455                len,
1456                run_ends,
1457                payload,
1458            } => {
1459                let start = *start as usize;
1460                let rel = off.checked_sub(start)?;
1461                if rel >= *len as usize {
1462                    return None;
1463                }
1464                let rel_u32 = u32::try_from(rel).ok()?;
1465                let run_idx = run_ends.partition_point(|end| *end <= rel_u32);
1466                payload.get_scalar(run_idx)
1467            }
1468        }
1469    }
1470
1471    fn subtract_fragment(&self, replacement: &OverlayFragment) -> Vec<OverlayFragment> {
1472        if let Some(offsets) = replacement.sparse_offsets_slice() {
1473            self.subtract_sparse_offsets(offsets)
1474        } else if let Some(range) = replacement.interval_coverage() {
1475            self.subtract_interval(range)
1476        } else {
1477            vec![self.clone()]
1478        }
1479    }
1480
1481    fn subtract_offset(&self, off: usize) -> Vec<OverlayFragment> {
1482        match self {
1483            OverlayFragment::SparseOffsets { .. } => {
1484                let Ok(off) = u32::try_from(off) else {
1485                    return vec![self.clone()];
1486                };
1487                self.subtract_sparse_offsets(core::slice::from_ref(&off))
1488            }
1489            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1490                self.subtract_interval(off..off.saturating_add(1))
1491            }
1492        }
1493    }
1494
1495    fn subtract_interval(&self, replacement: core::ops::Range<usize>) -> Vec<OverlayFragment> {
1496        if replacement.is_empty() {
1497            return vec![self.clone()];
1498        }
1499
1500        match self {
1501            OverlayFragment::SparseOffsets { offsets, payload } => {
1502                let cells: Vec<_> = offsets
1503                    .iter()
1504                    .enumerate()
1505                    .filter_map(|(idx, off)| {
1506                        let off_usize = *off as usize;
1507                        (!replacement.contains(&off_usize))
1508                            .then(|| payload.overlay_value(idx).map(|value| (off_usize, value)))?
1509                    })
1510                    .collect();
1511                OverlayFragment::sparse_offsets(cells).into_iter().collect()
1512            }
1513            OverlayFragment::DenseRange { .. } => {
1514                let Some(own) = self.interval_coverage() else {
1515                    return vec![self.clone()];
1516                };
1517                if own.end <= replacement.start || replacement.end <= own.start {
1518                    return vec![self.clone()];
1519                }
1520                let cut_start = replacement.start.max(own.start);
1521                let cut_end = replacement.end.min(own.end);
1522                let mut out = Vec::with_capacity(2);
1523                if own.start < cut_start
1524                    && let Some(left) =
1525                        self.dense_segment_with_start(own.start, own.start, cut_start)
1526                {
1527                    out.push(left);
1528                }
1529                if cut_end < own.end
1530                    && let Some(right) = self.dense_segment_with_start(cut_end, cut_end, own.end)
1531                {
1532                    out.push(right);
1533                }
1534                out
1535            }
1536            OverlayFragment::RunRange { .. } => {
1537                let Some(own) = self.interval_coverage() else {
1538                    return vec![self.clone()];
1539                };
1540                if own.end <= replacement.start || replacement.end <= own.start {
1541                    return vec![self.clone()];
1542                }
1543                let cut_start = replacement.start.max(own.start);
1544                let cut_end = replacement.end.min(own.end);
1545                let mut out = Vec::with_capacity(2);
1546                if own.start < cut_start
1547                    && let Some(left) = self.run_segment_with_start(own.start, own.start, cut_start)
1548                {
1549                    out.push(left);
1550                }
1551                if cut_end < own.end
1552                    && let Some(right) = self.run_segment_with_start(cut_end, cut_end, own.end)
1553                {
1554                    out.push(right);
1555                }
1556                out
1557            }
1558        }
1559    }
1560
1561    fn subtract_sparse_offsets(&self, replacement_offsets: &[u32]) -> Vec<OverlayFragment> {
1562        if replacement_offsets.is_empty() {
1563            return vec![self.clone()];
1564        }
1565
1566        match self {
1567            OverlayFragment::SparseOffsets { offsets, payload } => {
1568                let cells: Vec<_> = offsets
1569                    .iter()
1570                    .enumerate()
1571                    .filter_map(|(idx, off)| {
1572                        replacement_offsets.binary_search(off).is_err().then(|| {
1573                            payload
1574                                .overlay_value(idx)
1575                                .map(|value| (*off as usize, value))
1576                        })?
1577                    })
1578                    .collect();
1579                OverlayFragment::sparse_offsets(cells).into_iter().collect()
1580            }
1581            OverlayFragment::DenseRange { .. } => {
1582                self.subtract_sparse_offsets_from_dense(replacement_offsets)
1583            }
1584            OverlayFragment::RunRange { .. } => {
1585                self.subtract_sparse_offsets_from_run(replacement_offsets)
1586            }
1587        }
1588    }
1589
1590    fn sparse_holes_in_interval(offsets: &[u32], range: core::ops::Range<usize>) -> Vec<usize> {
1591        if range.is_empty() {
1592            return Vec::new();
1593        }
1594        let start = u32::try_from(range.start).unwrap_or(u32::MAX);
1595        let mut idx = offsets.partition_point(|off| *off < start);
1596        let mut holes = Vec::new();
1597        let mut last = None;
1598        while let Some(off) = offsets.get(idx).copied() {
1599            let off_usize = off as usize;
1600            if off_usize >= range.end {
1601                break;
1602            }
1603            if last != Some(off_usize) {
1604                holes.push(off_usize);
1605                last = Some(off_usize);
1606            }
1607            idx += 1;
1608        }
1609        holes
1610    }
1611
1612    fn subtract_sparse_offsets_from_dense(
1613        &self,
1614        replacement_offsets: &[u32],
1615    ) -> Vec<OverlayFragment> {
1616        let Some(own) = self.interval_coverage() else {
1617            return vec![self.clone()];
1618        };
1619        let holes = Self::sparse_holes_in_interval(replacement_offsets, own.clone());
1620        if holes.is_empty() {
1621            return vec![self.clone()];
1622        }
1623        if holes.len().saturating_add(1) > Self::MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK {
1624            return self.sparse_remainder_excluding_offsets(&holes);
1625        }
1626
1627        let mut out = Vec::with_capacity(holes.len().saturating_add(1));
1628        let mut seg_start = own.start;
1629        for hole in holes {
1630            if seg_start < hole
1631                && let Some(segment) = self.dense_segment_with_start(seg_start, seg_start, hole)
1632            {
1633                out.push(segment);
1634            }
1635            seg_start = hole.saturating_add(1);
1636        }
1637        if seg_start < own.end
1638            && let Some(segment) = self.dense_segment_with_start(seg_start, seg_start, own.end)
1639        {
1640            out.push(segment);
1641        }
1642        out
1643    }
1644
1645    fn subtract_sparse_offsets_from_run(
1646        &self,
1647        replacement_offsets: &[u32],
1648    ) -> Vec<OverlayFragment> {
1649        let Some(own) = self.interval_coverage() else {
1650            return vec![self.clone()];
1651        };
1652        let holes = Self::sparse_holes_in_interval(replacement_offsets, own.clone());
1653        if holes.is_empty() {
1654            return vec![self.clone()];
1655        }
1656        if holes.len().saturating_add(1) > Self::MAX_SPLIT_SEGMENTS_BEFORE_SPARSE_FALLBACK {
1657            return self.sparse_remainder_excluding_offsets(&holes);
1658        }
1659
1660        let mut out = Vec::with_capacity(holes.len().saturating_add(1));
1661        let mut seg_start = own.start;
1662        for hole in holes {
1663            if seg_start < hole
1664                && let Some(segment) = self.run_segment_with_start(seg_start, seg_start, hole)
1665            {
1666                out.push(segment);
1667            }
1668            seg_start = hole.saturating_add(1);
1669        }
1670        if seg_start < own.end
1671            && let Some(segment) = self.run_segment_with_start(seg_start, seg_start, own.end)
1672        {
1673            out.push(segment);
1674        }
1675        out
1676    }
1677
1678    fn sparse_remainder_excluding_offsets(&self, sorted_holes: &[usize]) -> Vec<OverlayFragment> {
1679        let cells: Vec<_> = self
1680            .cells()
1681            .into_iter()
1682            .filter(|(off, _)| sorted_holes.binary_search(off).is_err())
1683            .collect();
1684        OverlayFragment::sparse_offsets(cells).into_iter().collect()
1685    }
1686
1687    fn dense_segment_with_start(
1688        &self,
1689        new_start: usize,
1690        abs_start: usize,
1691        abs_end: usize,
1692    ) -> Option<OverlayFragment> {
1693        match self {
1694            OverlayFragment::DenseRange { start, payload, .. } => {
1695                if abs_start >= abs_end {
1696                    return None;
1697                }
1698                let base = *start as usize;
1699                let rel_start = abs_start.checked_sub(base)?;
1700                let len = abs_end.saturating_sub(abs_start);
1701                OverlayFragment::dense_range(new_start, payload.values_slice(rel_start, len))
1702            }
1703            _ => None,
1704        }
1705    }
1706
1707    fn run_segment_with_start(
1708        &self,
1709        new_start: usize,
1710        abs_start: usize,
1711        abs_end: usize,
1712    ) -> Option<OverlayFragment> {
1713        let OverlayFragment::RunRange {
1714            start,
1715            len,
1716            run_ends,
1717            payload,
1718        } = self
1719        else {
1720            return None;
1721        };
1722        if abs_start >= abs_end {
1723            return None;
1724        }
1725        let base = *start as usize;
1726        let frag_end = base.saturating_add(*len as usize);
1727        if abs_start < base || abs_end > frag_end {
1728            return None;
1729        }
1730
1731        let rel_start = abs_start - base;
1732        let rel_end = abs_end - base;
1733        let mut new_run_ends = Vec::new();
1734        let mut new_values = Vec::new();
1735        let mut prev_end = 0usize;
1736
1737        for (run_idx, end) in run_ends.iter().enumerate() {
1738            let run_start = prev_end;
1739            let run_end = *end as usize;
1740            let inter_start = run_start.max(rel_start);
1741            let inter_end = run_end.min(rel_end);
1742            if inter_start < inter_end {
1743                new_run_ends.push(inter_end - rel_start);
1744                if let Some(value) = payload.overlay_value(run_idx) {
1745                    new_values.push(value);
1746                }
1747            }
1748            prev_end = run_end;
1749            if prev_end >= rel_end {
1750                break;
1751            }
1752        }
1753
1754        OverlayFragment::run_range_from_parts(
1755            new_start,
1756            abs_end.saturating_sub(abs_start),
1757            new_run_ends,
1758            new_values,
1759        )
1760    }
1761
1762    fn cells(&self) -> Vec<(usize, OverlayValue)> {
1763        match self {
1764            OverlayFragment::SparseOffsets { offsets, payload } => offsets
1765                .iter()
1766                .enumerate()
1767                .filter_map(|(idx, off)| {
1768                    payload
1769                        .overlay_value(idx)
1770                        .map(|value| (*off as usize, value))
1771                })
1772                .collect(),
1773            OverlayFragment::DenseRange {
1774                start,
1775                len,
1776                payload,
1777            } => {
1778                let start = *start as usize;
1779                (0..*len as usize)
1780                    .filter_map(|idx| {
1781                        payload
1782                            .overlay_value(idx)
1783                            .map(|value| (start.saturating_add(idx), value))
1784                    })
1785                    .collect()
1786            }
1787            OverlayFragment::RunRange { start, len, .. } => {
1788                let start = *start as usize;
1789                (0..*len as usize)
1790                    .filter_map(|idx| {
1791                        self.get_scalar(start.saturating_add(idx))
1792                            .map(|value| (start.saturating_add(idx), value.to_overlay_value()))
1793                    })
1794                    .collect()
1795            }
1796        }
1797    }
1798
1799    fn slice(&self, off: usize, len: usize) -> Option<OverlayFragment> {
1800        let end = off.saturating_add(len);
1801        if len == 0 {
1802            return None;
1803        }
1804
1805        match self {
1806            OverlayFragment::SparseOffsets { offsets, payload } => {
1807                let start = u32::try_from(off).unwrap_or(u32::MAX);
1808                let lo = offsets.partition_point(|candidate| *candidate < start);
1809                let hi = offsets.partition_point(|candidate| (*candidate as usize) < end);
1810                let cells: Vec<_> = (lo..hi)
1811                    .filter_map(|idx| {
1812                        let rebased = (offsets[idx] as usize).saturating_sub(off);
1813                        payload.overlay_value(idx).map(|value| (rebased, value))
1814                    })
1815                    .collect();
1816                OverlayFragment::sparse_offsets(cells)
1817            }
1818            OverlayFragment::DenseRange { .. } => {
1819                let own = self.interval_coverage()?;
1820                let seg_start = own.start.max(off);
1821                let seg_end = own.end.min(end);
1822                if seg_start >= seg_end {
1823                    return None;
1824                }
1825                self.dense_segment_with_start(seg_start - off, seg_start, seg_end)
1826            }
1827            OverlayFragment::RunRange { .. } => {
1828                let own = self.interval_coverage()?;
1829                let seg_start = own.start.max(off);
1830                let seg_end = own.end.min(end);
1831                if seg_start >= seg_end {
1832                    return None;
1833                }
1834                self.run_segment_with_start(seg_start - off, seg_start, seg_end)
1835            }
1836        }
1837    }
1838}
1839#[derive(Debug, Default, Clone)]
1840pub struct Overlay {
1841    points: HashMap<usize, OverlayValue>,
1842    fragments: Vec<OverlayFragment>,
1843    // Deterministic (and intentionally approximate) accounting of overlay memory.
1844    // This is used for budget enforcement/observability; it does not attempt to reflect
1845    // the allocator's exact overhead.
1846    estimated_bytes: usize,
1847}
1848
1849impl Overlay {
1850    // Deterministic estimate per entry to keep budget enforcement stable across platforms.
1851    // Includes key + map/node overhead (approx) and value payload bytes.
1852    const ENTRY_BASE_BYTES: usize = OVERLAY_ENTRY_BASE_BYTES;
1853
1854    pub fn new() -> Self {
1855        Self {
1856            points: HashMap::new(),
1857            fragments: Vec::new(),
1858            estimated_bytes: 0,
1859        }
1860    }
1861
1862    #[inline]
1863    fn point_estimate(v: &OverlayValue) -> usize {
1864        Self::ENTRY_BASE_BYTES + v.estimated_payload_bytes()
1865    }
1866
1867    #[inline]
1868    fn adjust_estimated_bytes(&mut self, delta: isize) {
1869        if delta >= 0 {
1870            self.estimated_bytes = self.estimated_bytes.saturating_add(delta as usize);
1871        } else {
1872            self.estimated_bytes = self.estimated_bytes.saturating_sub((-delta) as usize);
1873        }
1874    }
1875
1876    #[inline]
1877    pub(crate) fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'_>> {
1878        self.points
1879            .get(&off)
1880            .map(OverlayScalar::Borrowed)
1881            .or_else(|| self.fragments.iter().rev().find_map(|f| f.get_scalar(off)))
1882    }
1883
1884    #[inline]
1885    pub fn get(&self, off: usize) -> Option<OverlayValue> {
1886        self.get_scalar(off).map(|value| value.to_overlay_value())
1887    }
1888
1889    #[inline]
1890    pub(crate) fn set_scalar(&mut self, off: usize, v: OverlayValue) -> isize {
1891        let removed = self.remove_scalar(off);
1892        let new_est = Self::point_estimate(&v);
1893        self.points.insert(off, v);
1894        self.adjust_estimated_bytes(new_est as isize);
1895        removed.saturating_add(new_est as isize)
1896    }
1897
1898    #[inline]
1899    pub fn set(&mut self, off: usize, v: OverlayValue) -> isize {
1900        self.set_scalar(off, v)
1901    }
1902
1903    pub(crate) fn apply_fragment(&mut self, fragment: OverlayFragment) -> isize {
1904        let mut delta = self.remove_points_covered_by_fragment(&fragment);
1905        delta = delta.saturating_add(self.remove_fragments_covered_by_fragment(&fragment));
1906
1907        let fragment_est = fragment.estimated_bytes();
1908        self.fragments.push(fragment);
1909        self.adjust_estimated_bytes(fragment_est as isize);
1910        delta.saturating_add(fragment_est as isize)
1911    }
1912
1913    fn remove_points_covered_by_fragment(&mut self, fragment: &OverlayFragment) -> isize {
1914        let mut removed = 0usize;
1915        match fragment {
1916            OverlayFragment::SparseOffsets { offsets, .. } => {
1917                for off in offsets.iter().copied() {
1918                    if let Some(old) = self.points.remove(&(off as usize)) {
1919                        removed = removed.saturating_add(Self::point_estimate(&old));
1920                    }
1921                }
1922            }
1923            OverlayFragment::DenseRange { .. } | OverlayFragment::RunRange { .. } => {
1924                if let Some(range) = fragment.interval_coverage() {
1925                    let keys: Vec<_> = self
1926                        .points
1927                        .keys()
1928                        .copied()
1929                        .filter(|off| range.contains(off))
1930                        .collect();
1931                    for off in keys {
1932                        if let Some(old) = self.points.remove(&off) {
1933                            removed = removed.saturating_add(Self::point_estimate(&old));
1934                        }
1935                    }
1936                }
1937            }
1938        }
1939        self.estimated_bytes = self.estimated_bytes.saturating_sub(removed);
1940        -(removed as isize)
1941    }
1942
1943    fn remove_fragments_covered_by_fragment(&mut self, replacement: &OverlayFragment) -> isize {
1944        if self.fragments.is_empty() {
1945            return 0;
1946        }
1947
1948        let mut delta: isize = 0;
1949        let mut fragments = Vec::with_capacity(self.fragments.len());
1950        for fragment in self.fragments.drain(..) {
1951            if !fragment.intersects_fragment_exact(replacement) {
1952                fragments.push(fragment);
1953                continue;
1954            }
1955
1956            let old_est = fragment.estimated_bytes();
1957            let replacements = fragment.subtract_fragment(replacement);
1958            let new_est = replacements
1959                .iter()
1960                .map(OverlayFragment::estimated_bytes)
1961                .fold(0usize, usize::saturating_add);
1962            fragments.extend(replacements);
1963            delta = delta.saturating_add(new_est as isize - old_est as isize);
1964        }
1965        self.fragments = fragments;
1966        self.adjust_estimated_bytes(delta);
1967        delta
1968    }
1969
1970    #[inline]
1971    pub(crate) fn remove_scalar(&mut self, off: usize) -> isize {
1972        let mut delta = 0isize;
1973        if let Some(old) = self.points.remove(&off) {
1974            let old_est = Self::point_estimate(&old);
1975            self.estimated_bytes = self.estimated_bytes.saturating_sub(old_est);
1976            delta = delta.saturating_sub(old_est as isize);
1977        }
1978
1979        if !self.fragments.is_empty() {
1980            let mut fragments = Vec::with_capacity(self.fragments.len());
1981            for fragment in self.fragments.drain(..) {
1982                if fragment.get_scalar(off).is_none() {
1983                    fragments.push(fragment);
1984                    continue;
1985                }
1986
1987                let old_est = fragment.estimated_bytes();
1988                let replacements = fragment.subtract_offset(off);
1989                let new_est = replacements
1990                    .iter()
1991                    .map(OverlayFragment::estimated_bytes)
1992                    .fold(0usize, usize::saturating_add);
1993                fragments.extend(replacements);
1994                delta = delta.saturating_add(new_est as isize - old_est as isize);
1995            }
1996            self.fragments = fragments;
1997            self.adjust_estimated_bytes(delta);
1998        }
1999
2000        delta
2001    }
2002
2003    #[inline]
2004    pub fn remove(&mut self, off: usize) -> isize {
2005        self.remove_scalar(off)
2006    }
2007
2008    pub(crate) fn remove_range(&mut self, range: core::ops::Range<usize>) -> isize {
2009        if range.is_empty() {
2010            return 0;
2011        }
2012
2013        let mut delta = 0isize;
2014        let removed_points: Vec<_> = self
2015            .points
2016            .keys()
2017            .copied()
2018            .filter(|off| range.contains(off))
2019            .collect();
2020        for off in removed_points {
2021            if let Some(old) = self.points.remove(&off) {
2022                let old_est = Self::point_estimate(&old);
2023                self.estimated_bytes = self.estimated_bytes.saturating_sub(old_est);
2024                delta = delta.saturating_sub(old_est as isize);
2025            }
2026        }
2027
2028        if !self.fragments.is_empty() {
2029            let mut fragment_delta = 0isize;
2030            let mut fragments = Vec::with_capacity(self.fragments.len());
2031            for fragment in self.fragments.drain(..) {
2032                let old_est = fragment.estimated_bytes();
2033                let replacements = fragment.subtract_interval(range.clone());
2034                let new_est = replacements
2035                    .iter()
2036                    .map(OverlayFragment::estimated_bytes)
2037                    .fold(0usize, usize::saturating_add);
2038                fragments.extend(replacements);
2039                fragment_delta = fragment_delta.saturating_add(new_est as isize - old_est as isize);
2040            }
2041            self.fragments = fragments;
2042            self.adjust_estimated_bytes(fragment_delta);
2043            delta = delta.saturating_add(fragment_delta);
2044        }
2045
2046        delta
2047    }
2048
2049    #[inline]
2050    pub(crate) fn clear_all(&mut self) -> usize {
2051        let freed = self.estimated_bytes;
2052        self.points.clear();
2053        self.fragments.clear();
2054        self.estimated_bytes = 0;
2055        freed
2056    }
2057
2058    #[inline]
2059    pub fn clear(&mut self) -> usize {
2060        self.clear_all()
2061    }
2062
2063    #[inline]
2064    pub fn len(&self) -> usize {
2065        self.points.len().saturating_add(
2066            self.fragments
2067                .iter()
2068                .map(OverlayFragment::coverage_len)
2069                .sum(),
2070        )
2071    }
2072
2073    #[inline]
2074    pub fn estimated_bytes(&self) -> usize {
2075        self.estimated_bytes
2076    }
2077
2078    #[inline]
2079    pub fn is_empty(&self) -> bool {
2080        self.points.is_empty() && self.fragments.is_empty()
2081    }
2082
2083    #[inline]
2084    pub(crate) fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2085        self.points.keys().any(|k| range.contains(k))
2086            || self
2087                .fragments
2088                .iter()
2089                .any(|fragment| fragment.has_any_in_range(range.clone()))
2090    }
2091
2092    #[inline]
2093    pub fn any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2094        self.has_any_in_range(range)
2095    }
2096
2097    pub(crate) fn slice(&self, off: usize, len: usize) -> Overlay {
2098        let mut out = Overlay::new();
2099        let end = off.saturating_add(len);
2100        for fragment in &self.fragments {
2101            if let Some(sliced) = fragment.slice(off, len) {
2102                let _ = out.apply_fragment(sliced);
2103            }
2104        }
2105        for (k, v) in self.points.iter() {
2106            if *k >= off && *k < end {
2107                let _ = out.set_scalar(*k - off, v.clone());
2108            }
2109        }
2110        out
2111    }
2112
2113    /// Iterate over logical `(offset, value)` pairs in the overlay.
2114    pub fn iter(&self) -> impl Iterator<Item = (usize, OverlayValue)> {
2115        let mut cells = BTreeMap::new();
2116        for fragment in &self.fragments {
2117            for (off, value) in fragment.cells() {
2118                cells.insert(off, value);
2119            }
2120        }
2121        for (off, value) in &self.points {
2122            cells.insert(*off, value.clone());
2123        }
2124        cells.into_iter()
2125    }
2126
2127    /// Iterate over physical point entries only.
2128    pub(crate) fn iter_points(&self) -> impl Iterator<Item = (&usize, &OverlayValue)> {
2129        self.points.iter()
2130    }
2131}
2132
2133#[cfg(test)]
2134#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
2135pub(crate) struct OverlayDebugStats {
2136    pub(crate) points: usize,
2137    pub(crate) sparse_fragments: usize,
2138    pub(crate) dense_fragments: usize,
2139    pub(crate) run_fragments: usize,
2140    pub(crate) covered_len: usize,
2141}
2142
2143#[cfg(test)]
2144impl Overlay {
2145    pub(crate) fn debug_stats(&self) -> OverlayDebugStats {
2146        let mut stats = OverlayDebugStats {
2147            points: self.points.len(),
2148            covered_len: self.len(),
2149            ..OverlayDebugStats::default()
2150        };
2151        for fragment in &self.fragments {
2152            match fragment {
2153                OverlayFragment::SparseOffsets { .. } => stats.sparse_fragments += 1,
2154                OverlayFragment::DenseRange { .. } => stats.dense_fragments += 1,
2155                OverlayFragment::RunRange { .. } => stats.run_fragments += 1,
2156            }
2157        }
2158        stats
2159    }
2160
2161    pub(crate) fn debug_is_normalized(&self) -> bool {
2162        let mut covered = std::collections::HashSet::new();
2163        for off in self.points.keys().copied() {
2164            if !covered.insert(off) {
2165                return false;
2166            }
2167        }
2168        for fragment in &self.fragments {
2169            for (off, _) in fragment.cells() {
2170                if !covered.insert(off) {
2171                    return false;
2172                }
2173            }
2174        }
2175        covered.len() == self.len()
2176    }
2177
2178    pub(crate) fn debug_recomputed_estimated_bytes(&self) -> usize {
2179        let point_bytes = self
2180            .points
2181            .values()
2182            .map(Self::point_estimate)
2183            .fold(0usize, usize::saturating_add);
2184        let fragment_bytes = self
2185            .fragments
2186            .iter()
2187            .map(OverlayFragment::estimated_bytes)
2188            .fold(0usize, usize::saturating_add);
2189        point_bytes.saturating_add(fragment_bytes)
2190    }
2191}
2192
2193#[derive(Debug, Clone, Copy, Default)]
2194#[cfg_attr(test, derive(serde::Serialize))]
2195pub(crate) struct OverlaySelectStats {
2196    pub(crate) zip_select_calls: usize,
2197    pub(crate) direct_dense_slices: usize,
2198    pub(crate) direct_run_materializations: usize,
2199    pub(crate) partial_sparse_intersections: usize,
2200    pub(crate) partial_dense_intersections: usize,
2201    pub(crate) partial_run_intersections: usize,
2202    pub(crate) partial_overlay_builds: usize,
2203    pub(crate) row_scalar_fallbacks: usize,
2204    pub(crate) point_entries_applied: usize,
2205    pub(crate) fragment_intersections: usize,
2206}
2207
2208#[cfg(test)]
2209thread_local! {
2210    static OVERLAY_SELECT_STATS: std::cell::RefCell<OverlaySelectStats> =
2211        std::cell::RefCell::new(OverlaySelectStats::default());
2212}
2213
2214#[cfg(test)]
2215pub(crate) fn reset_overlay_select_stats() {
2216    OVERLAY_SELECT_STATS.with(|stats| *stats.borrow_mut() = OverlaySelectStats::default());
2217}
2218
2219#[cfg(test)]
2220pub(crate) fn snapshot_overlay_select_stats() -> OverlaySelectStats {
2221    OVERLAY_SELECT_STATS.with(|stats| *stats.borrow())
2222}
2223
2224#[cfg(test)]
2225fn record_overlay_select_stats(f: impl FnOnce(&mut OverlaySelectStats)) {
2226    OVERLAY_SELECT_STATS.with(|stats| f(&mut stats.borrow_mut()));
2227}
2228
2229#[cfg(not(test))]
2230#[inline]
2231fn record_overlay_select_stats(_f: impl FnOnce(&mut OverlaySelectStats)) {}
2232
2233#[derive(Debug, Clone, Copy, Eq, PartialEq)]
2234enum OverlayFragmentShape {
2235    Sparse,
2236    Dense,
2237    Run,
2238}
2239
2240struct OverlaySlots<T> {
2241    present: Vec<bool>,
2242    values: Vec<Option<T>>,
2243    any_present: bool,
2244}
2245
2246impl<T> OverlaySlots<T> {
2247    fn new(len: usize) -> Self {
2248        Self {
2249            present: vec![false; len],
2250            values: (0..len).map(|_| None).collect(),
2251            any_present: false,
2252        }
2253    }
2254
2255    #[inline]
2256    fn set(&mut self, idx: usize, value: Option<T>) {
2257        if idx >= self.present.len() {
2258            return;
2259        }
2260        self.present[idx] = true;
2261        self.values[idx] = value;
2262        self.any_present = true;
2263    }
2264
2265    #[inline]
2266    fn any_present(&self) -> bool {
2267        self.any_present
2268    }
2269}
2270
2271pub(crate) struct OverlayCascade<'a> {
2272    user: &'a Overlay,
2273    computed: &'a Overlay,
2274}
2275
2276impl<'a> OverlayCascade<'a> {
2277    #[inline]
2278    pub(crate) fn new(user: &'a Overlay, computed: &'a Overlay) -> Self {
2279        Self { user, computed }
2280    }
2281
2282    #[inline]
2283    pub(crate) fn get_scalar(&self, off: usize) -> Option<OverlayScalar<'a>> {
2284        self.user
2285            .get_scalar(off)
2286            .or_else(|| self.computed.get_scalar(off))
2287    }
2288
2289    #[inline]
2290    pub(crate) fn has_any_in_range(&self, range: core::ops::Range<usize>) -> bool {
2291        self.user.has_any_in_range(range.clone()) || self.computed.has_any_in_range(range)
2292    }
2293
2294    pub(crate) fn select_numbers(
2295        &self,
2296        range: core::ops::Range<usize>,
2297        base: &Float64Array,
2298    ) -> Arc<Float64Array> {
2299        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2300            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2301            return Self::dense_numbers(fragment, range);
2302        }
2303        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2304            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2305            return Self::run_numbers(fragment, range);
2306        }
2307        if !self.user.has_any_in_range(range.clone()) {
2308            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2309                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2310                return Self::dense_numbers(fragment, range);
2311            }
2312            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2313                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2314                return Self::run_numbers(fragment, range);
2315            }
2316        }
2317
2318        if !self.has_any_in_range(range.clone()) {
2319            return Arc::new(base.clone());
2320        }
2321
2322        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2323        let len = range.end.saturating_sub(range.start);
2324        let mut slots = OverlaySlots::<f64>::new(len);
2325        Self::apply_number_layer(self.computed, range.clone(), &mut slots);
2326        Self::apply_number_layer(self.user, range.clone(), &mut slots);
2327        if !slots.any_present() {
2328            return Arc::new(base.clone());
2329        }
2330
2331        let mut mask_b = BooleanBuilder::with_capacity(len);
2332        let mut values_b = Float64Builder::with_capacity(len);
2333        for idx in 0..len {
2334            mask_b.append_value(slots.present[idx]);
2335            match slots.values[idx] {
2336                Some(value) => values_b.append_value(value),
2337                None => values_b.append_null(),
2338            }
2339        }
2340        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2341        let mask = mask_b.finish();
2342        let values = values_b.finish();
2343        let zipped =
2344            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip numeric overlay");
2345        Arc::new(
2346            zipped
2347                .as_any()
2348                .downcast_ref::<Float64Array>()
2349                .expect("numeric overlay zip type")
2350                .clone(),
2351        )
2352    }
2353
2354    pub(crate) fn select_booleans(
2355        &self,
2356        range: core::ops::Range<usize>,
2357        base: &BooleanArray,
2358    ) -> Arc<BooleanArray> {
2359        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2360            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2361            return Self::dense_booleans(fragment, range);
2362        }
2363        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2364            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2365            return Self::run_booleans(fragment, range);
2366        }
2367        if !self.user.has_any_in_range(range.clone()) {
2368            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2369                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2370                return Self::dense_booleans(fragment, range);
2371            }
2372            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2373                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2374                return Self::run_booleans(fragment, range);
2375            }
2376        }
2377
2378        if !self.has_any_in_range(range.clone()) {
2379            return Arc::new(base.clone());
2380        }
2381
2382        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2383        let len = range.end.saturating_sub(range.start);
2384        let mut slots = OverlaySlots::<bool>::new(len);
2385        Self::apply_boolean_layer(self.computed, range.clone(), &mut slots);
2386        Self::apply_boolean_layer(self.user, range.clone(), &mut slots);
2387        if !slots.any_present() {
2388            return Arc::new(base.clone());
2389        }
2390
2391        let mut mask_b = BooleanBuilder::with_capacity(len);
2392        let mut values_b = BooleanBuilder::with_capacity(len);
2393        for idx in 0..len {
2394            mask_b.append_value(slots.present[idx]);
2395            match slots.values[idx] {
2396                Some(value) => values_b.append_value(value),
2397                None => values_b.append_null(),
2398            }
2399        }
2400        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2401        let mask = mask_b.finish();
2402        let values = values_b.finish();
2403        let zipped =
2404            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip boolean overlay");
2405        Arc::new(
2406            zipped
2407                .as_any()
2408                .downcast_ref::<BooleanArray>()
2409                .expect("boolean overlay zip type")
2410                .clone(),
2411        )
2412    }
2413
2414    pub(crate) fn select_text(
2415        &self,
2416        range: core::ops::Range<usize>,
2417        base: &StringArray,
2418    ) -> ArrayRef {
2419        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2420            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2421            return Self::dense_text(fragment, range);
2422        }
2423        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2424            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2425            return Self::run_text(fragment, range);
2426        }
2427        if !self.user.has_any_in_range(range.clone()) {
2428            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2429                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2430                return Self::dense_text(fragment, range);
2431            }
2432            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2433                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2434                return Self::run_text(fragment, range);
2435            }
2436        }
2437
2438        if !self.has_any_in_range(range.clone()) {
2439            return Arc::new(base.clone()) as ArrayRef;
2440        }
2441
2442        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2443        let len = range.end.saturating_sub(range.start);
2444        let mut slots = OverlaySlots::<String>::new(len);
2445        Self::apply_text_layer(self.computed, range.clone(), &mut slots);
2446        Self::apply_text_layer(self.user, range.clone(), &mut slots);
2447        if !slots.any_present() {
2448            return Arc::new(base.clone()) as ArrayRef;
2449        }
2450
2451        let mut mask_b = BooleanBuilder::with_capacity(len);
2452        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2453        for idx in 0..len {
2454            mask_b.append_value(slots.present[idx]);
2455            match &slots.values[idx] {
2456                Some(value) => values_b.append_value(value),
2457                None => values_b.append_null(),
2458            }
2459        }
2460        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2461        let mask = mask_b.finish();
2462        let values = values_b.finish();
2463        crate::compute_prelude::zip_select(&mask, &values, base).expect("zip text overlay")
2464    }
2465
2466    pub(crate) fn select_errors(
2467        &self,
2468        range: core::ops::Range<usize>,
2469        base: &UInt8Array,
2470    ) -> Arc<UInt8Array> {
2471        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2472            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2473            return Self::dense_errors(fragment, range);
2474        }
2475        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2476            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2477            return Self::run_errors(fragment, range);
2478        }
2479        if !self.user.has_any_in_range(range.clone()) {
2480            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2481                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2482                return Self::dense_errors(fragment, range);
2483            }
2484            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2485                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2486                return Self::run_errors(fragment, range);
2487            }
2488        }
2489
2490        if !self.has_any_in_range(range.clone()) {
2491            return Arc::new(base.clone());
2492        }
2493
2494        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2495        let len = range.end.saturating_sub(range.start);
2496        let mut slots = OverlaySlots::<u8>::new(len);
2497        Self::apply_error_layer(self.computed, range.clone(), &mut slots);
2498        Self::apply_error_layer(self.user, range.clone(), &mut slots);
2499        if !slots.any_present() {
2500            return Arc::new(base.clone());
2501        }
2502
2503        let mut mask_b = BooleanBuilder::with_capacity(len);
2504        let mut values_b = UInt8Builder::with_capacity(len);
2505        for idx in 0..len {
2506            mask_b.append_value(slots.present[idx]);
2507            match slots.values[idx] {
2508                Some(value) => values_b.append_value(value),
2509                None => values_b.append_null(),
2510            }
2511        }
2512        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2513        let mask = mask_b.finish();
2514        let values = values_b.finish();
2515        let zipped =
2516            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip error overlay");
2517        Arc::new(
2518            zipped
2519                .as_any()
2520                .downcast_ref::<UInt8Array>()
2521                .expect("error overlay zip type")
2522                .clone(),
2523        )
2524    }
2525
2526    pub(crate) fn select_type_tags(
2527        &self,
2528        range: core::ops::Range<usize>,
2529        base: &UInt8Array,
2530    ) -> Arc<UInt8Array> {
2531        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2532            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2533            return Self::dense_type_tags(fragment, range);
2534        }
2535        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2536            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2537            return Self::run_type_tags(fragment, range);
2538        }
2539        if !self.user.has_any_in_range(range.clone()) {
2540            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2541                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2542                return Self::dense_type_tags(fragment, range);
2543            }
2544            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2545                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2546                return Self::run_type_tags(fragment, range);
2547            }
2548        }
2549
2550        if !self.has_any_in_range(range.clone()) {
2551            return Arc::new(base.clone());
2552        }
2553
2554        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2555        let len = range.end.saturating_sub(range.start);
2556        let mut slots = OverlaySlots::<u8>::new(len);
2557        Self::apply_type_tag_layer(self.computed, range.clone(), &mut slots);
2558        Self::apply_type_tag_layer(self.user, range.clone(), &mut slots);
2559        if !slots.any_present() {
2560            return Arc::new(base.clone());
2561        }
2562
2563        let mut mask_b = BooleanBuilder::with_capacity(len);
2564        let mut values_b = UInt8Builder::with_capacity(len);
2565        for idx in 0..len {
2566            mask_b.append_value(slots.present[idx]);
2567            match slots.values[idx] {
2568                Some(value) => values_b.append_value(value),
2569                None => values_b.append_null(),
2570            }
2571        }
2572        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2573        let mask = mask_b.finish();
2574        let values = values_b.finish();
2575        let zipped =
2576            crate::compute_prelude::zip_select(&mask, &values, base).expect("zip type-tag overlay");
2577        Arc::new(
2578            zipped
2579                .as_any()
2580                .downcast_ref::<UInt8Array>()
2581                .expect("type-tag overlay zip type")
2582                .clone(),
2583        )
2584    }
2585
2586    pub(crate) fn select_lowered_text(
2587        &self,
2588        range: core::ops::Range<usize>,
2589        base: &StringArray,
2590    ) -> Arc<StringArray> {
2591        if let Some(fragment) = self.user.full_cover_dense_fragment(range.clone()) {
2592            record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2593            return Self::dense_lowered_text(fragment, range);
2594        }
2595        if let Some(fragment) = self.user.full_cover_run_fragment(range.clone()) {
2596            record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2597            return Self::run_lowered_text(fragment, range);
2598        }
2599        if !self.user.has_any_in_range(range.clone()) {
2600            if let Some(fragment) = self.computed.full_cover_dense_fragment(range.clone()) {
2601                record_overlay_select_stats(|stats| stats.direct_dense_slices += 1);
2602                return Self::dense_lowered_text(fragment, range);
2603            }
2604            if let Some(fragment) = self.computed.full_cover_run_fragment(range.clone()) {
2605                record_overlay_select_stats(|stats| stats.direct_run_materializations += 1);
2606                return Self::run_lowered_text(fragment, range);
2607            }
2608        }
2609
2610        if !self.has_any_in_range(range.clone()) {
2611            return Arc::new(base.clone());
2612        }
2613        if self.user.fragments.is_empty() && self.computed.fragments.is_empty() {
2614            return self.select_lowered_text_point_scalar(range, base);
2615        }
2616
2617        record_overlay_select_stats(|stats| stats.partial_overlay_builds += 1);
2618        let len = range.end.saturating_sub(range.start);
2619        let mut slots = OverlaySlots::<String>::new(len);
2620        Self::apply_lowered_text_layer(self.computed, range.clone(), &mut slots);
2621        Self::apply_lowered_text_layer(self.user, range.clone(), &mut slots);
2622        if !slots.any_present() {
2623            return Arc::new(base.clone());
2624        }
2625
2626        let mut mask_b = BooleanBuilder::with_capacity(len);
2627        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2628        for idx in 0..len {
2629            mask_b.append_value(slots.present[idx]);
2630            match &slots.values[idx] {
2631                Some(value) => values_b.append_value(value),
2632                None => values_b.append_null(),
2633            }
2634        }
2635        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2636        let mask = mask_b.finish();
2637        let values = values_b.finish();
2638        let zipped = crate::compute_prelude::zip_select(&mask, &values, base)
2639            .expect("zip lowered text overlay");
2640        Arc::new(
2641            zipped
2642                .as_any()
2643                .downcast_ref::<StringArray>()
2644                .expect("lowered text overlay zip type")
2645                .clone(),
2646        )
2647    }
2648
2649    fn select_lowered_text_point_scalar(
2650        &self,
2651        range: core::ops::Range<usize>,
2652        base: &StringArray,
2653    ) -> Arc<StringArray> {
2654        let len = range.end.saturating_sub(range.start);
2655        let mut mask_b = BooleanBuilder::with_capacity(len);
2656        let mut values_b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2657        record_overlay_select_stats(|stats| stats.row_scalar_fallbacks += len);
2658        for off in range {
2659            if let Some(value) = self.get_scalar(off) {
2660                mask_b.append_value(true);
2661                if let Some(s) = value.lowered_text_value() {
2662                    values_b.append_value(&s);
2663                } else {
2664                    values_b.append_null();
2665                }
2666                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2667            } else {
2668                mask_b.append_value(false);
2669                values_b.append_null();
2670            }
2671        }
2672        record_overlay_select_stats(|stats| stats.zip_select_calls += 1);
2673        let mask = mask_b.finish();
2674        let values = values_b.finish();
2675        let zipped = crate::compute_prelude::zip_select(&mask, &values, base)
2676            .expect("zip lowered text overlay");
2677        Arc::new(
2678            zipped
2679                .as_any()
2680                .downcast_ref::<StringArray>()
2681                .expect("lowered text overlay zip type")
2682                .clone(),
2683        )
2684    }
2685
2686    fn dense_numbers(
2687        fragment: &OverlayFragment,
2688        range: core::ops::Range<usize>,
2689    ) -> Arc<Float64Array> {
2690        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2691        Self::payload_numbers_slice(payload, rel_start, len)
2692    }
2693
2694    fn dense_booleans(
2695        fragment: &OverlayFragment,
2696        range: core::ops::Range<usize>,
2697    ) -> Arc<BooleanArray> {
2698        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2699        Self::payload_booleans_slice(payload, rel_start, len)
2700    }
2701
2702    fn dense_text(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> ArrayRef {
2703        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2704        Self::payload_text_slice(payload, rel_start, len)
2705    }
2706
2707    fn dense_errors(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> Arc<UInt8Array> {
2708        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2709        Self::payload_errors_slice(payload, rel_start, len)
2710    }
2711
2712    fn dense_type_tags(
2713        fragment: &OverlayFragment,
2714        range: core::ops::Range<usize>,
2715    ) -> Arc<UInt8Array> {
2716        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2717        Self::payload_type_tags_slice(payload, rel_start, len)
2718    }
2719
2720    fn dense_lowered_text(
2721        fragment: &OverlayFragment,
2722        range: core::ops::Range<usize>,
2723    ) -> Arc<StringArray> {
2724        let (rel_start, len, payload) = Self::dense_payload_window(fragment, range);
2725        Self::payload_lowered_text_materialize(payload, rel_start, len)
2726    }
2727
2728    fn dense_payload_window(
2729        fragment: &OverlayFragment,
2730        range: core::ops::Range<usize>,
2731    ) -> (usize, usize, &OverlayFragmentPayload) {
2732        let OverlayFragment::DenseRange { start, payload, .. } = fragment else {
2733            unreachable!("dense payload window requires DenseRange")
2734        };
2735        let rel_start = range.start.saturating_sub(*start as usize);
2736        (rel_start, range.end.saturating_sub(range.start), payload)
2737    }
2738
2739    fn run_numbers(
2740        fragment: &OverlayFragment,
2741        range: core::ops::Range<usize>,
2742    ) -> Arc<Float64Array> {
2743        let mut b = Float64Builder::with_capacity(range.end.saturating_sub(range.start));
2744        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2745            if let Some(value) = payload.number_at(run_idx) {
2746                for _ in 0..repeat {
2747                    b.append_value(value);
2748                }
2749            } else {
2750                for _ in 0..repeat {
2751                    b.append_null();
2752                }
2753            }
2754        });
2755        Arc::new(b.finish())
2756    }
2757
2758    fn run_booleans(
2759        fragment: &OverlayFragment,
2760        range: core::ops::Range<usize>,
2761    ) -> Arc<BooleanArray> {
2762        let mut b = BooleanBuilder::with_capacity(range.end.saturating_sub(range.start));
2763        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2764            if let Some(value) = payload.boolean_at(run_idx) {
2765                for _ in 0..repeat {
2766                    b.append_value(value);
2767                }
2768            } else {
2769                for _ in 0..repeat {
2770                    b.append_null();
2771                }
2772            }
2773        });
2774        Arc::new(b.finish())
2775    }
2776
2777    fn run_text(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> ArrayRef {
2778        let mut b = StringBuilder::with_capacity(
2779            range.end.saturating_sub(range.start),
2780            range.end.saturating_sub(range.start).saturating_mul(8),
2781        );
2782        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2783            if let Some(value) = payload.text_at(run_idx) {
2784                for _ in 0..repeat {
2785                    b.append_value(value);
2786                }
2787            } else {
2788                for _ in 0..repeat {
2789                    b.append_null();
2790                }
2791            }
2792        });
2793        Arc::new(b.finish()) as ArrayRef
2794    }
2795
2796    fn run_errors(fragment: &OverlayFragment, range: core::ops::Range<usize>) -> Arc<UInt8Array> {
2797        let mut b = UInt8Builder::with_capacity(range.end.saturating_sub(range.start));
2798        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2799            if let Some(value) = payload.error_at(run_idx) {
2800                for _ in 0..repeat {
2801                    b.append_value(value);
2802                }
2803            } else {
2804                for _ in 0..repeat {
2805                    b.append_null();
2806                }
2807            }
2808        });
2809        Arc::new(b.finish())
2810    }
2811
2812    fn run_type_tags(
2813        fragment: &OverlayFragment,
2814        range: core::ops::Range<usize>,
2815    ) -> Arc<UInt8Array> {
2816        let mut b = UInt8Builder::with_capacity(range.end.saturating_sub(range.start));
2817        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2818            let tag = payload.type_tag_at(run_idx).unwrap_or(TypeTag::Empty) as u8;
2819            for _ in 0..repeat {
2820                b.append_value(tag);
2821            }
2822        });
2823        Arc::new(b.finish())
2824    }
2825
2826    fn run_lowered_text(
2827        fragment: &OverlayFragment,
2828        range: core::ops::Range<usize>,
2829    ) -> Arc<StringArray> {
2830        let mut b = StringBuilder::with_capacity(
2831            range.end.saturating_sub(range.start),
2832            range.end.saturating_sub(range.start).saturating_mul(8),
2833        );
2834        Self::for_each_run_payload_index(fragment, range, |payload, run_idx, repeat| {
2835            let value = Self::payload_lowered_text_at(payload, run_idx);
2836            if let Some(value) = value {
2837                for _ in 0..repeat {
2838                    b.append_value(&value);
2839                }
2840            } else {
2841                for _ in 0..repeat {
2842                    b.append_null();
2843                }
2844            }
2845        });
2846        Arc::new(b.finish())
2847    }
2848
2849    fn payload_numbers_slice(
2850        payload: &OverlayFragmentPayload,
2851        start: usize,
2852        len: usize,
2853    ) -> Arc<Float64Array> {
2854        if let Some(array) = &payload.numbers {
2855            let sliced = array.slice(start, len);
2856            Arc::new(
2857                sliced
2858                    .as_any()
2859                    .downcast_ref::<Float64Array>()
2860                    .unwrap()
2861                    .clone(),
2862            )
2863        } else {
2864            Self::null_numbers(len)
2865        }
2866    }
2867
2868    fn payload_booleans_slice(
2869        payload: &OverlayFragmentPayload,
2870        start: usize,
2871        len: usize,
2872    ) -> Arc<BooleanArray> {
2873        if let Some(array) = &payload.booleans {
2874            let sliced = array.slice(start, len);
2875            Arc::new(
2876                sliced
2877                    .as_any()
2878                    .downcast_ref::<BooleanArray>()
2879                    .unwrap()
2880                    .clone(),
2881            )
2882        } else {
2883            Self::null_booleans(len)
2884        }
2885    }
2886
2887    fn payload_text_slice(payload: &OverlayFragmentPayload, start: usize, len: usize) -> ArrayRef {
2888        if let Some(array) = &payload.text {
2889            array.slice(start, len)
2890        } else {
2891            new_null_array(&DataType::Utf8, len)
2892        }
2893    }
2894
2895    fn payload_errors_slice(
2896        payload: &OverlayFragmentPayload,
2897        start: usize,
2898        len: usize,
2899    ) -> Arc<UInt8Array> {
2900        if let Some(array) = &payload.errors {
2901            let sliced = array.slice(start, len);
2902            Arc::new(
2903                sliced
2904                    .as_any()
2905                    .downcast_ref::<UInt8Array>()
2906                    .unwrap()
2907                    .clone(),
2908            )
2909        } else {
2910            Self::null_errors(len)
2911        }
2912    }
2913
2914    fn payload_type_tags_slice(
2915        payload: &OverlayFragmentPayload,
2916        start: usize,
2917        len: usize,
2918    ) -> Arc<UInt8Array> {
2919        let sliced = payload.type_tags.slice(start, len);
2920        Arc::new(
2921            sliced
2922                .as_any()
2923                .downcast_ref::<UInt8Array>()
2924                .unwrap()
2925                .clone(),
2926        )
2927    }
2928
2929    fn payload_lowered_text_materialize(
2930        payload: &OverlayFragmentPayload,
2931        start: usize,
2932        len: usize,
2933    ) -> Arc<StringArray> {
2934        let mut b = StringBuilder::with_capacity(len, len.saturating_mul(8));
2935        for idx in start..start.saturating_add(len) {
2936            if let Some(value) = Self::payload_lowered_text_at(payload, idx) {
2937                b.append_value(&value);
2938            } else {
2939                b.append_null();
2940            }
2941        }
2942        Arc::new(b.finish())
2943    }
2944
2945    fn payload_lowered_text_at(payload: &OverlayFragmentPayload, idx: usize) -> Option<String> {
2946        match payload.type_tag_at(idx)? {
2947            TypeTag::Text => payload.text_at(idx).map(|value| value.to_lowercase()),
2948            TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
2949                payload.number_at(idx).map(|value| value.to_string())
2950            }
2951            TypeTag::Boolean => payload
2952                .boolean_at(idx)
2953                .map(|value| if value { "true" } else { "false" }.to_string()),
2954            TypeTag::Empty | TypeTag::Error | TypeTag::Pending => None,
2955        }
2956    }
2957
2958    fn null_numbers(len: usize) -> Arc<Float64Array> {
2959        let arr = new_null_array(&DataType::Float64, len);
2960        Arc::new(arr.as_any().downcast_ref::<Float64Array>().unwrap().clone())
2961    }
2962
2963    fn null_booleans(len: usize) -> Arc<BooleanArray> {
2964        let arr = new_null_array(&DataType::Boolean, len);
2965        Arc::new(arr.as_any().downcast_ref::<BooleanArray>().unwrap().clone())
2966    }
2967
2968    fn null_errors(len: usize) -> Arc<UInt8Array> {
2969        let arr = new_null_array(&DataType::UInt8, len);
2970        Arc::new(arr.as_any().downcast_ref::<UInt8Array>().unwrap().clone())
2971    }
2972
2973    fn apply_number_layer(
2974        layer: &Overlay,
2975        range: core::ops::Range<usize>,
2976        slots: &mut OverlaySlots<f64>,
2977    ) {
2978        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2979            payload.number_at(idx)
2980        });
2981        for (off, value) in layer.iter_points() {
2982            if range.contains(off) {
2983                slots.set(*off - range.start, value.numeric_lane_value());
2984                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
2985            }
2986        }
2987    }
2988
2989    fn apply_boolean_layer(
2990        layer: &Overlay,
2991        range: core::ops::Range<usize>,
2992        slots: &mut OverlaySlots<bool>,
2993    ) {
2994        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
2995            payload.boolean_at(idx)
2996        });
2997        for (off, value) in layer.iter_points() {
2998            if range.contains(off) {
2999                slots.set(*off - range.start, value.boolean_lane_value());
3000                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
3001            }
3002        }
3003    }
3004
3005    fn apply_text_layer(
3006        layer: &Overlay,
3007        range: core::ops::Range<usize>,
3008        slots: &mut OverlaySlots<String>,
3009    ) {
3010        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
3011            payload.text_at(idx).map(ToString::to_string)
3012        });
3013        for (off, value) in layer.iter_points() {
3014            if range.contains(off) {
3015                slots.set(
3016                    *off - range.start,
3017                    value.text_lane_value().map(ToString::to_string),
3018                );
3019                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
3020            }
3021        }
3022    }
3023
3024    fn apply_error_layer(
3025        layer: &Overlay,
3026        range: core::ops::Range<usize>,
3027        slots: &mut OverlaySlots<u8>,
3028    ) {
3029        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
3030            payload.error_at(idx)
3031        });
3032        for (off, value) in layer.iter_points() {
3033            if range.contains(off) {
3034                slots.set(*off - range.start, value.error_lane_value());
3035                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
3036            }
3037        }
3038    }
3039
3040    fn apply_type_tag_layer(
3041        layer: &Overlay,
3042        range: core::ops::Range<usize>,
3043        slots: &mut OverlaySlots<u8>,
3044    ) {
3045        Self::apply_fragment_layer(layer, range.clone(), slots, |payload, idx| {
3046            payload.type_tag_at(idx).map(|tag| tag as u8)
3047        });
3048        for (off, value) in layer.iter_points() {
3049            if range.contains(off) {
3050                slots.set(*off - range.start, Some(value.type_tag() as u8));
3051                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
3052            }
3053        }
3054    }
3055
3056    fn apply_lowered_text_layer(
3057        layer: &Overlay,
3058        range: core::ops::Range<usize>,
3059        slots: &mut OverlaySlots<String>,
3060    ) {
3061        Self::apply_fragment_layer(layer, range.clone(), slots, Self::payload_lowered_text_at);
3062        for (off, value) in layer.iter_points() {
3063            if range.contains(off) {
3064                slots.set(*off - range.start, value.lowered_text_value());
3065                record_overlay_select_stats(|stats| stats.point_entries_applied += 1);
3066            }
3067        }
3068    }
3069
3070    fn apply_fragment_layer<T>(
3071        layer: &Overlay,
3072        range: core::ops::Range<usize>,
3073        slots: &mut OverlaySlots<T>,
3074        mut value_at: impl FnMut(&OverlayFragmentPayload, usize) -> Option<T>,
3075    ) {
3076        for fragment in &layer.fragments {
3077            if !fragment.has_any_in_range(range.clone()) {
3078                continue;
3079            }
3080            Self::record_fragment_intersection(fragment);
3081            Self::for_each_fragment_payload_index(
3082                fragment,
3083                range.clone(),
3084                |out_idx, payload, payload_idx| {
3085                    slots.set(out_idx, value_at(payload, payload_idx));
3086                },
3087            );
3088        }
3089    }
3090
3091    fn record_fragment_intersection(fragment: &OverlayFragment) {
3092        let shape = match fragment {
3093            OverlayFragment::SparseOffsets { .. } => OverlayFragmentShape::Sparse,
3094            OverlayFragment::DenseRange { .. } => OverlayFragmentShape::Dense,
3095            OverlayFragment::RunRange { .. } => OverlayFragmentShape::Run,
3096        };
3097        record_overlay_select_stats(|stats| {
3098            stats.fragment_intersections += 1;
3099            match shape {
3100                OverlayFragmentShape::Sparse => stats.partial_sparse_intersections += 1,
3101                OverlayFragmentShape::Dense => stats.partial_dense_intersections += 1,
3102                OverlayFragmentShape::Run => stats.partial_run_intersections += 1,
3103            }
3104        });
3105    }
3106
3107    fn for_each_fragment_payload_index(
3108        fragment: &OverlayFragment,
3109        range: core::ops::Range<usize>,
3110        mut f: impl FnMut(usize, &OverlayFragmentPayload, usize),
3111    ) {
3112        if range.is_empty() {
3113            return;
3114        }
3115        match fragment {
3116            OverlayFragment::SparseOffsets { offsets, payload } => {
3117                let start = u32::try_from(range.start).unwrap_or(u32::MAX);
3118                let lo = offsets.partition_point(|off| *off < start);
3119                let hi = offsets.partition_point(|off| (*off as usize) < range.end);
3120                for (idx, off) in offsets.iter().enumerate().take(hi).skip(lo) {
3121                    let out_idx = (*off as usize).saturating_sub(range.start);
3122                    f(out_idx, payload, idx);
3123                }
3124            }
3125            OverlayFragment::DenseRange {
3126                start,
3127                len,
3128                payload,
3129            } => {
3130                let frag_start = *start as usize;
3131                let frag_end = frag_start.saturating_add(*len as usize);
3132                let inter_start = frag_start.max(range.start);
3133                let inter_end = frag_end.min(range.end);
3134                if inter_start >= inter_end {
3135                    return;
3136                }
3137                for abs in inter_start..inter_end {
3138                    f(abs - range.start, payload, abs - frag_start);
3139                }
3140            }
3141            OverlayFragment::RunRange {
3142                start,
3143                len,
3144                run_ends,
3145                payload,
3146            } => {
3147                let frag_start = *start as usize;
3148                let frag_end = frag_start.saturating_add(*len as usize);
3149                let inter_start = frag_start.max(range.start);
3150                let inter_end = frag_end.min(range.end);
3151                if inter_start >= inter_end {
3152                    return;
3153                }
3154                let mut prev_end = 0usize;
3155                for (run_idx, run_end) in run_ends.iter().enumerate() {
3156                    let run_start_abs = frag_start.saturating_add(prev_end);
3157                    let run_end_abs = frag_start.saturating_add(*run_end as usize);
3158                    let start_abs = run_start_abs.max(inter_start);
3159                    let end_abs = run_end_abs.min(inter_end);
3160                    if start_abs < end_abs {
3161                        for abs in start_abs..end_abs {
3162                            f(abs - range.start, payload, run_idx);
3163                        }
3164                    }
3165                    prev_end = *run_end as usize;
3166                    if run_end_abs >= inter_end {
3167                        break;
3168                    }
3169                }
3170            }
3171        }
3172    }
3173
3174    fn for_each_run_payload_index(
3175        fragment: &OverlayFragment,
3176        range: core::ops::Range<usize>,
3177        mut f: impl FnMut(&OverlayFragmentPayload, usize, usize),
3178    ) {
3179        let OverlayFragment::RunRange {
3180            start,
3181            len,
3182            run_ends,
3183            payload,
3184        } = fragment
3185        else {
3186            unreachable!("run payload iteration requires RunRange")
3187        };
3188        let frag_start = *start as usize;
3189        let frag_end = frag_start.saturating_add(*len as usize);
3190        let inter_start = frag_start.max(range.start);
3191        let inter_end = frag_end.min(range.end);
3192        if inter_start >= inter_end {
3193            return;
3194        }
3195        let mut prev_end = 0usize;
3196        for (run_idx, run_end) in run_ends.iter().enumerate() {
3197            let run_start_abs = frag_start.saturating_add(prev_end);
3198            let run_end_abs = frag_start.saturating_add(*run_end as usize);
3199            let start_abs = run_start_abs.max(inter_start);
3200            let end_abs = run_end_abs.min(inter_end);
3201            if start_abs < end_abs {
3202                f(payload, run_idx, end_abs - start_abs);
3203            }
3204            prev_end = *run_end as usize;
3205            if run_end_abs >= inter_end {
3206                break;
3207            }
3208        }
3209    }
3210}
3211
3212impl OverlayFragmentPayload {
3213    #[inline]
3214    fn type_tag_at(&self, idx: usize) -> Option<TypeTag> {
3215        if idx >= self.type_tags.len() || self.type_tags.is_null(idx) {
3216            return None;
3217        }
3218        Some(TypeTag::from_u8(self.type_tags.value(idx)))
3219    }
3220}
3221
3222impl Overlay {
3223    fn full_cover_dense_fragment(
3224        &self,
3225        range: core::ops::Range<usize>,
3226    ) -> Option<&OverlayFragment> {
3227        self.full_cover_single_fragment(range, OverlayFragmentShape::Dense)
3228    }
3229
3230    fn full_cover_run_fragment(&self, range: core::ops::Range<usize>) -> Option<&OverlayFragment> {
3231        self.full_cover_single_fragment(range, OverlayFragmentShape::Run)
3232    }
3233
3234    fn full_cover_single_fragment(
3235        &self,
3236        range: core::ops::Range<usize>,
3237        shape: OverlayFragmentShape,
3238    ) -> Option<&OverlayFragment> {
3239        if range.is_empty() || self.points.keys().any(|off| range.contains(off)) {
3240            return None;
3241        }
3242        let mut found = None;
3243        for fragment in &self.fragments {
3244            if !fragment.has_any_in_range(range.clone()) {
3245                continue;
3246            }
3247            let shape_matches = matches!(
3248                (shape, fragment),
3249                (
3250                    OverlayFragmentShape::Dense,
3251                    OverlayFragment::DenseRange { .. }
3252                ) | (OverlayFragmentShape::Run, OverlayFragment::RunRange { .. })
3253            );
3254            let covers = fragment
3255                .interval_coverage()
3256                .is_some_and(|own| own.start <= range.start && range.end <= own.end);
3257            if shape_matches && covers && found.is_none() {
3258                found = Some(fragment);
3259            } else {
3260                return None;
3261            }
3262        }
3263        found
3264    }
3265}
3266fn append_overlay_value_to_lane_builders(
3267    ov: &OverlayValue,
3268    tag_b: &mut UInt8Builder,
3269    nb: &mut Float64Builder,
3270    bb: &mut BooleanBuilder,
3271    sb: &mut StringBuilder,
3272    eb: &mut UInt8Builder,
3273    non_num: &mut usize,
3274    non_bool: &mut usize,
3275    non_text: &mut usize,
3276    non_err: &mut usize,
3277) {
3278    match ov {
3279        OverlayValue::Empty => {
3280            tag_b.append_value(TypeTag::Empty as u8);
3281            nb.append_null();
3282            bb.append_null();
3283            sb.append_null();
3284            eb.append_null();
3285        }
3286        OverlayValue::Number(n) => {
3287            tag_b.append_value(TypeTag::Number as u8);
3288            nb.append_value(*n);
3289            *non_num += 1;
3290            bb.append_null();
3291            sb.append_null();
3292            eb.append_null();
3293        }
3294        OverlayValue::DateTime(serial) => {
3295            tag_b.append_value(TypeTag::DateTime as u8);
3296            nb.append_value(*serial);
3297            *non_num += 1;
3298            bb.append_null();
3299            sb.append_null();
3300            eb.append_null();
3301        }
3302        OverlayValue::Duration(serial) => {
3303            tag_b.append_value(TypeTag::Duration as u8);
3304            nb.append_value(*serial);
3305            *non_num += 1;
3306            bb.append_null();
3307            sb.append_null();
3308            eb.append_null();
3309        }
3310        OverlayValue::Boolean(b) => {
3311            tag_b.append_value(TypeTag::Boolean as u8);
3312            nb.append_null();
3313            bb.append_value(*b);
3314            *non_bool += 1;
3315            sb.append_null();
3316            eb.append_null();
3317        }
3318        OverlayValue::Text(s) => {
3319            tag_b.append_value(TypeTag::Text as u8);
3320            nb.append_null();
3321            bb.append_null();
3322            sb.append_value(s);
3323            *non_text += 1;
3324            eb.append_null();
3325        }
3326        OverlayValue::Error(code) => {
3327            tag_b.append_value(TypeTag::Error as u8);
3328            nb.append_null();
3329            bb.append_null();
3330            sb.append_null();
3331            eb.append_value(*code);
3332            *non_err += 1;
3333        }
3334        OverlayValue::Pending => {
3335            tag_b.append_value(TypeTag::Pending as u8);
3336            nb.append_null();
3337            bb.append_null();
3338            sb.append_null();
3339            eb.append_null();
3340        }
3341    }
3342}
3343
3344impl ArrowSheet {
3345    /// Create a logical sheet whose cells are initially implicit empty values.
3346    ///
3347    /// Columns start with no materialized chunks; callers can populate only touched
3348    /// column/chunk pairs via `set_sparse_overlay_value`. Missing chunks remain
3349    /// observable as empty cells through scalar and range reads.
3350    pub fn new_sparse(sheet_name: &str, ncols: usize, nrows: usize, chunk_rows: usize) -> Self {
3351        let chunk_rows = chunk_rows.max(1);
3352        let columns = (0..ncols)
3353            .map(|idx| ArrowColumn {
3354                chunks: Vec::new(),
3355                sparse_chunks: FxHashMap::default(),
3356                index: idx as u32,
3357            })
3358            .collect();
3359        let mut sheet = Self {
3360            name: Arc::from(sheet_name.to_string()),
3361            columns,
3362            nrows: 0,
3363            chunk_starts: Vec::new(),
3364            chunk_rows,
3365        };
3366        sheet.ensure_row_capacity(nrows);
3367        sheet
3368    }
3369
3370    /// Populate a single sparse cell using the overlay cascade while preserving
3371    /// implicit-empty chunks elsewhere. This is intended for sparse initial ingest,
3372    /// not for user edits; no graph/changelog side effects are triggered here.
3373    pub fn set_sparse_overlay_value(
3374        &mut self,
3375        abs_row: usize,
3376        abs_col: usize,
3377        value: OverlayValue,
3378    ) -> isize {
3379        if abs_col >= self.columns.len() {
3380            let start = self.columns.len();
3381            self.columns
3382                .extend((start..=abs_col).map(|idx| ArrowColumn {
3383                    chunks: Vec::new(),
3384                    sparse_chunks: FxHashMap::default(),
3385                    index: idx as u32,
3386                }));
3387        }
3388        if abs_row >= self.nrows as usize {
3389            self.ensure_row_capacity(abs_row + 1);
3390        }
3391        let Some((ch_idx, in_off)) = self.chunk_of_row(abs_row) else {
3392            return 0;
3393        };
3394        let Some(ch) = self.ensure_column_chunk_mut(abs_col, ch_idx) else {
3395            return 0;
3396        };
3397        ch.overlay.set(in_off, value)
3398    }
3399
3400    /// Return a summary of each column's chunk counts, total rows, and lane presence.
3401    pub fn shape(&self) -> Vec<ColumnShape> {
3402        self.columns
3403            .iter()
3404            .map(|c| {
3405                let chunks = c.chunks.len();
3406                let rows = self.nrows as usize;
3407                let has_num = c.chunks.iter().any(|ch| ch.meta.non_null_num > 0);
3408                let has_bool = c.chunks.iter().any(|ch| ch.meta.non_null_bool > 0);
3409                let has_text = c.chunks.iter().any(|ch| ch.meta.non_null_text > 0);
3410                let has_err = c.chunks.iter().any(|ch| ch.meta.non_null_err > 0);
3411                ColumnShape {
3412                    index: c.index,
3413                    chunks,
3414                    rows,
3415                    has_num,
3416                    has_bool,
3417                    has_text,
3418                    has_err,
3419                }
3420            })
3421            .collect()
3422    }
3423
3424    pub fn range_view(
3425        &self,
3426        sr: usize,
3427        sc: usize,
3428        er: usize,
3429        ec: usize,
3430    ) -> crate::engine::range_view::RangeView<'_> {
3431        let r0 = er.checked_sub(sr).map(|d| d + 1).unwrap_or(0);
3432        let c0 = ec.checked_sub(sc).map(|d| d + 1).unwrap_or(0);
3433        let (rows, cols) = if r0 == 0 || c0 == 0 { (0, 0) } else { (r0, c0) };
3434        crate::engine::range_view::RangeView::new(
3435            crate::engine::range_view::RangeBacking::Borrowed(self),
3436            sr,
3437            sc,
3438            er,
3439            ec,
3440            rows,
3441            cols,
3442        )
3443    }
3444
3445    /// Fast single-cell read (0-based row/col) with overlay precedence.
3446    ///
3447    /// This avoids constructing a 1x1 RangeView and is intended for tight read loops.
3448    #[inline]
3449    pub fn get_cell_value(&self, abs_row: usize, abs_col: usize) -> LiteralValue {
3450        let sheet_rows = self.nrows as usize;
3451        if abs_row >= sheet_rows {
3452            return LiteralValue::Empty;
3453        }
3454        if abs_col >= self.columns.len() {
3455            return LiteralValue::Empty;
3456        }
3457        let Some((ch_idx, in_off)) = self.chunk_of_row(abs_row) else {
3458            return LiteralValue::Empty;
3459        };
3460        let col_ref = &self.columns[abs_col];
3461        let Some(ch) = col_ref.chunk(ch_idx) else {
3462            return LiteralValue::Empty;
3463        };
3464
3465        // Overlay takes precedence: user edits over computed over base.
3466        let cascade = OverlayCascade::new(&ch.overlay, &ch.computed_overlay);
3467        if let Some(ov) = cascade.get_scalar(in_off) {
3468            return ov.to_literal();
3469        }
3470
3471        // Read tag and route to lane.
3472        let tag_u8 = ch.type_tag.value(in_off);
3473        match TypeTag::from_u8(tag_u8) {
3474            TypeTag::Empty => LiteralValue::Empty,
3475            TypeTag::Number => {
3476                if let Some(arr) = &ch.numbers {
3477                    if arr.is_null(in_off) {
3478                        return LiteralValue::Empty;
3479                    }
3480                    LiteralValue::Number(arr.value(in_off))
3481                } else {
3482                    LiteralValue::Empty
3483                }
3484            }
3485            TypeTag::DateTime => {
3486                if let Some(arr) = &ch.numbers {
3487                    if arr.is_null(in_off) {
3488                        return LiteralValue::Empty;
3489                    }
3490                    LiteralValue::from_serial_number(arr.value(in_off))
3491                } else {
3492                    LiteralValue::Empty
3493                }
3494            }
3495            TypeTag::Duration => {
3496                if let Some(arr) = &ch.numbers {
3497                    if arr.is_null(in_off) {
3498                        return LiteralValue::Empty;
3499                    }
3500                    let serial = arr.value(in_off);
3501                    let nanos_f = serial * 86_400.0 * 1_000_000_000.0;
3502                    let nanos = nanos_f.round().clamp(i64::MIN as f64, i64::MAX as f64) as i64;
3503                    LiteralValue::Duration(chrono::Duration::nanoseconds(nanos))
3504                } else {
3505                    LiteralValue::Empty
3506                }
3507            }
3508            TypeTag::Boolean => {
3509                if let Some(arr) = &ch.booleans {
3510                    if arr.is_null(in_off) {
3511                        return LiteralValue::Empty;
3512                    }
3513                    LiteralValue::Boolean(arr.value(in_off))
3514                } else {
3515                    LiteralValue::Empty
3516                }
3517            }
3518            TypeTag::Text => {
3519                if let Some(arr) = &ch.text {
3520                    if arr.is_null(in_off) {
3521                        return LiteralValue::Empty;
3522                    }
3523                    let sa = arr
3524                        .as_any()
3525                        .downcast_ref::<arrow_array::StringArray>()
3526                        .unwrap();
3527                    LiteralValue::Text(sa.value(in_off).to_string())
3528                } else {
3529                    LiteralValue::Empty
3530                }
3531            }
3532            TypeTag::Error => {
3533                if let Some(arr) = &ch.errors {
3534                    if arr.is_null(in_off) {
3535                        return LiteralValue::Empty;
3536                    }
3537                    let kind = unmap_error_code(arr.value(in_off));
3538                    LiteralValue::Error(ExcelError::new(kind))
3539                } else {
3540                    LiteralValue::Empty
3541                }
3542            }
3543            TypeTag::Pending => LiteralValue::Pending,
3544        }
3545    }
3546
3547    /// Ensure capacity to address at least `target_rows` rows by extending the row chunk map.
3548    ///
3549    /// This updates `chunk_starts`/`nrows` but does **not** eagerly densify all columns with
3550    /// new empty chunks. Missing chunks are treated as all-empty and can be materialized lazily.
3551    pub fn ensure_row_capacity(&mut self, target_rows: usize) {
3552        if target_rows as u32 <= self.nrows {
3553            return;
3554        }
3555
3556        let chunk_size = self.chunk_rows.max(1);
3557
3558        // `chunk_starts` must represent fixed-size chunk boundaries based on `chunk_rows`, not
3559        // incremental growth steps. In particular, repeated calls like ensure_row_capacity(1),
3560        // ensure_row_capacity(2), ... must NOT create a new chunk per row.
3561        if self.chunk_starts.is_empty() {
3562            self.chunk_starts.push(0);
3563        }
3564
3565        // Extend chunk starts only when `target_rows` crosses a chunk boundary.
3566        // Example: chunk_size=3, target_rows=6 => chunk_starts=[0,3]
3567        let mut next_start = self
3568            .chunk_starts
3569            .last()
3570            .copied()
3571            .unwrap_or(0)
3572            .saturating_add(chunk_size);
3573        while next_start < target_rows {
3574            self.chunk_starts.push(next_start);
3575            next_start = next_start.saturating_add(chunk_size);
3576        }
3577
3578        self.nrows = target_rows as u32;
3579
3580        // Any previously-materialized chunk may have been created when the sheet had fewer rows.
3581        // When `chunk_starts` extends, chunks that used to be "last" can become interior chunks
3582        // with a larger fixed boundary. Ensure materialized chunks are grown to their current
3583        // boundary-derived length so RangeView slicing stays in-bounds.
3584        let starts = self.chunk_starts.clone();
3585        let nrows = self.nrows as usize;
3586        let required_len_for = |ch_idx: usize| -> Option<usize> {
3587            let start = *starts.get(ch_idx)?;
3588            let end = starts.get(ch_idx + 1).copied().unwrap_or(nrows);
3589            Some(end.saturating_sub(start))
3590        };
3591
3592        for col in &mut self.columns {
3593            for (idx, ch) in col.chunks.iter_mut().enumerate() {
3594                if let Some(req) = required_len_for(idx) {
3595                    ch.grow_len_to(req);
3596                }
3597            }
3598            if !col.sparse_chunks.is_empty() {
3599                let keys: Vec<usize> = col.sparse_chunks.keys().copied().collect();
3600                for idx in keys {
3601                    if let (Some(req), Some(ch)) =
3602                        (required_len_for(idx), col.sparse_chunks.get_mut(&idx))
3603                    {
3604                        ch.grow_len_to(req);
3605                    }
3606                }
3607            }
3608        }
3609    }
3610
3611    /// Ensure a mutable chunk for a given column/chunk index.
3612    ///
3613    /// If the chunk is beyond the column's dense chunk vector, it is stored in `sparse_chunks`.
3614    pub fn ensure_column_chunk_mut(
3615        &mut self,
3616        col_idx: usize,
3617        ch_idx: usize,
3618    ) -> Option<&mut ColumnChunk> {
3619        let start = *self.chunk_starts.get(ch_idx)?;
3620        let end = self
3621            .chunk_starts
3622            .get(ch_idx + 1)
3623            .copied()
3624            .unwrap_or(self.nrows as usize);
3625        let len = end.saturating_sub(start);
3626
3627        let col = self.columns.get_mut(col_idx)?;
3628        if ch_idx < col.chunks.len() {
3629            return Some(&mut col.chunks[ch_idx]);
3630        }
3631        Some(
3632            col.sparse_chunks
3633                .entry(ch_idx)
3634                .or_insert_with(|| Self::make_empty_chunk(len)),
3635        )
3636    }
3637
3638    /// Return (chunk_idx, in_chunk_offset) for absolute 0-based row.
3639    pub fn chunk_of_row(&self, abs_row: usize) -> Option<(usize, usize)> {
3640        if abs_row >= self.nrows as usize {
3641            return None;
3642        }
3643        let ch_idx = match self.chunk_starts.binary_search(&abs_row) {
3644            Ok(i) => i,
3645            Err(0) => 0,
3646            Err(i) => i - 1,
3647        };
3648        let start = self.chunk_starts[ch_idx];
3649        Some((ch_idx, abs_row - start))
3650    }
3651
3652    fn recompute_chunk_starts(&mut self) {
3653        self.chunk_starts.clear();
3654        if let Some(col0) = self.columns.first() {
3655            let mut cur = 0usize;
3656            for ch in &col0.chunks {
3657                self.chunk_starts.push(cur);
3658                cur += ch.type_tag.len();
3659            }
3660        }
3661    }
3662
3663    fn make_empty_chunk(len: usize) -> ColumnChunk {
3664        ColumnChunk {
3665            numbers: None,
3666            booleans: None,
3667            text: None,
3668            errors: None,
3669            type_tag: Arc::new(UInt8Array::from(vec![TypeTag::Empty as u8; len])),
3670            formula_id: None,
3671            meta: ColumnChunkMeta {
3672                len,
3673                non_null_num: 0,
3674                non_null_bool: 0,
3675                non_null_text: 0,
3676                non_null_err: 0,
3677            },
3678            lazy_null_numbers: OnceCell::new(),
3679            lazy_null_booleans: OnceCell::new(),
3680            lazy_null_text: OnceCell::new(),
3681            lazy_null_errors: OnceCell::new(),
3682            lowered_text: OnceCell::new(),
3683            overlay: Overlay::new(),
3684            computed_overlay: Overlay::new(),
3685        }
3686    }
3687
3688    fn slice_chunk(ch: &ColumnChunk, off: usize, len: usize) -> ColumnChunk {
3689        // Slice type tags
3690        use arrow_array::Array;
3691        let type_tag: Arc<UInt8Array> = Arc::new(
3692            Array::slice(ch.type_tag.as_ref(), off, len)
3693                .as_any()
3694                .downcast_ref::<UInt8Array>()
3695                .unwrap()
3696                .clone(),
3697        );
3698        // Slice numbers if present and keep only if any non-null
3699        let numbers: Option<Arc<Float64Array>> = ch.numbers.as_ref().and_then(|a| {
3700            let sl = Array::slice(a.as_ref(), off, len);
3701            let fa = sl.as_any().downcast_ref::<Float64Array>().unwrap().clone();
3702            let nn = len.saturating_sub(fa.null_count());
3703            if nn == 0 { None } else { Some(Arc::new(fa)) }
3704        });
3705        let booleans: Option<Arc<BooleanArray>> = ch.booleans.as_ref().and_then(|a| {
3706            let sl = Array::slice(a.as_ref(), off, len);
3707            let ba = sl.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
3708            let nn = len.saturating_sub(ba.null_count());
3709            if nn == 0 { None } else { Some(Arc::new(ba)) }
3710        });
3711        let text: Option<ArrayRef> = ch.text.as_ref().and_then(|a| {
3712            let sl = Array::slice(a.as_ref(), off, len);
3713            let sa = sl.as_any().downcast_ref::<StringArray>().unwrap().clone();
3714            let nn = len.saturating_sub(sa.null_count());
3715            if nn == 0 {
3716                None
3717            } else {
3718                Some(Arc::new(sa) as ArrayRef)
3719            }
3720        });
3721        let errors: Option<Arc<UInt8Array>> = ch.errors.as_ref().and_then(|a| {
3722            let sl = Array::slice(a.as_ref(), off, len);
3723            let ea = sl.as_any().downcast_ref::<UInt8Array>().unwrap().clone();
3724            let nn = len.saturating_sub(ea.null_count());
3725            if nn == 0 { None } else { Some(Arc::new(ea)) }
3726        });
3727        // Split overlays for this slice.
3728        let overlay = ch.overlay.slice(off, len);
3729        let computed_overlay = ch.computed_overlay.slice(off, len);
3730        let non_null_num = numbers.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3731        let non_null_bool = booleans.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3732        let non_null_text = text.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3733        let non_null_err = errors.as_ref().map(|a| len - a.null_count()).unwrap_or(0);
3734        ColumnChunk {
3735            numbers: numbers.clone(),
3736            booleans: booleans.clone(),
3737            text: text.clone(),
3738            errors: errors.clone(),
3739            type_tag,
3740            formula_id: None,
3741            meta: ColumnChunkMeta {
3742                len,
3743                non_null_num,
3744                non_null_bool,
3745                non_null_text,
3746                non_null_err,
3747            },
3748            lazy_null_numbers: OnceCell::new(),
3749            lazy_null_booleans: OnceCell::new(),
3750            lazy_null_text: OnceCell::new(),
3751            lazy_null_errors: OnceCell::new(),
3752            lowered_text: OnceCell::new(),
3753            overlay,
3754            computed_overlay,
3755        }
3756    }
3757
3758    /// Heuristic compaction: rebuilds a chunk's base arrays by applying its overlay when
3759    /// overlay density crosses thresholds. Returns true if a rebuild occurred.
3760    pub fn maybe_compact_chunk(
3761        &mut self,
3762        col_idx: usize,
3763        ch_idx: usize,
3764        abs_threshold: usize,
3765        frac_den: usize,
3766    ) -> usize {
3767        if col_idx >= self.columns.len() {
3768            return 0;
3769        }
3770
3771        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
3772            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
3773                return 0;
3774            };
3775            let len = ch_ref.type_tag.len();
3776            if len == 0 {
3777                return 0;
3778            }
3779
3780            let ov_len = ch_ref.overlay.len();
3781            let den = frac_den.max(1);
3782            let trig = ov_len > (len / den) || ov_len > abs_threshold;
3783            if !trig {
3784                return 0;
3785            }
3786
3787            // Rebuild: merge base lanes with overlays row-by-row.
3788            let mut tag_b = UInt8Builder::with_capacity(len);
3789            let mut nb = Float64Builder::with_capacity(len);
3790            let mut bb = BooleanBuilder::with_capacity(len);
3791            let mut sb = StringBuilder::with_capacity(len, len * 8);
3792            let mut eb = UInt8Builder::with_capacity(len);
3793            let mut non_num = 0usize;
3794            let mut non_bool = 0usize;
3795            let mut non_text = 0usize;
3796            let mut non_err = 0usize;
3797
3798            for i in 0..len {
3799                // If overlay present, use it. Otherwise, use base tag+lane.
3800                if let Some(ov) = ch_ref.overlay.get_scalar(i) {
3801                    let ov = ov.to_overlay_value();
3802                    append_overlay_value_to_lane_builders(
3803                        &ov,
3804                        &mut tag_b,
3805                        &mut nb,
3806                        &mut bb,
3807                        &mut sb,
3808                        &mut eb,
3809                        &mut non_num,
3810                        &mut non_bool,
3811                        &mut non_text,
3812                        &mut non_err,
3813                    );
3814                } else {
3815                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
3816                    match tag {
3817                        TypeTag::Empty => {
3818                            tag_b.append_value(TypeTag::Empty as u8);
3819                            nb.append_null();
3820                            bb.append_null();
3821                            sb.append_null();
3822                            eb.append_null();
3823                        }
3824                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
3825                            tag_b.append_value(tag as u8);
3826                            if let Some(a) = &ch_ref.numbers {
3827                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
3828                                if fa.is_null(i) {
3829                                    nb.append_null();
3830                                } else {
3831                                    nb.append_value(fa.value(i));
3832                                    non_num += 1;
3833                                }
3834                            } else {
3835                                nb.append_null();
3836                            }
3837                            bb.append_null();
3838                            sb.append_null();
3839                            eb.append_null();
3840                        }
3841                        TypeTag::Boolean => {
3842                            tag_b.append_value(TypeTag::Boolean as u8);
3843                            nb.append_null();
3844                            if let Some(a) = &ch_ref.booleans {
3845                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
3846                                if ba.is_null(i) {
3847                                    bb.append_null();
3848                                } else {
3849                                    bb.append_value(ba.value(i));
3850                                    non_bool += 1;
3851                                }
3852                            } else {
3853                                bb.append_null();
3854                            }
3855                            sb.append_null();
3856                            eb.append_null();
3857                        }
3858                        TypeTag::Text => {
3859                            tag_b.append_value(TypeTag::Text as u8);
3860                            nb.append_null();
3861                            bb.append_null();
3862                            if let Some(a) = &ch_ref.text {
3863                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
3864                                if sa.is_null(i) {
3865                                    sb.append_null();
3866                                } else {
3867                                    sb.append_value(sa.value(i));
3868                                    non_text += 1;
3869                                }
3870                            } else {
3871                                sb.append_null();
3872                            }
3873                            eb.append_null();
3874                        }
3875                        TypeTag::Error => {
3876                            tag_b.append_value(TypeTag::Error as u8);
3877                            nb.append_null();
3878                            bb.append_null();
3879                            sb.append_null();
3880                            if let Some(a) = &ch_ref.errors {
3881                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
3882                                if ea.is_null(i) {
3883                                    eb.append_null();
3884                                } else {
3885                                    eb.append_value(ea.value(i));
3886                                    non_err += 1;
3887                                }
3888                            } else {
3889                                eb.append_null();
3890                            }
3891                        }
3892                        TypeTag::Pending => {
3893                            tag_b.append_value(TypeTag::Pending as u8);
3894                            nb.append_null();
3895                            bb.append_null();
3896                            sb.append_null();
3897                            eb.append_null();
3898                        }
3899                    }
3900                }
3901            }
3902
3903            let tags = Arc::new(tag_b.finish());
3904            let numbers = {
3905                let a = nb.finish();
3906                if non_num == 0 {
3907                    None
3908                } else {
3909                    Some(Arc::new(a))
3910                }
3911            };
3912            let booleans = {
3913                let a = bb.finish();
3914                if non_bool == 0 {
3915                    None
3916                } else {
3917                    Some(Arc::new(a))
3918                }
3919            };
3920            let text = {
3921                let a = sb.finish();
3922                if non_text == 0 {
3923                    None
3924                } else {
3925                    Some(Arc::new(a) as ArrayRef)
3926                }
3927            };
3928            let errors = {
3929                let a = eb.finish();
3930                if non_err == 0 {
3931                    None
3932                } else {
3933                    Some(Arc::new(a))
3934                }
3935            };
3936
3937            (
3938                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
3939            )
3940        };
3941
3942        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
3943            return 0;
3944        };
3945
3946        ch_mut.type_tag = tags;
3947        ch_mut.numbers = numbers;
3948        ch_mut.booleans = booleans;
3949        ch_mut.text = text;
3950        ch_mut.errors = errors;
3951        let freed = ch_mut.overlay.clear();
3952        ch_mut.lowered_text = OnceCell::new();
3953        ch_mut.meta.len = len;
3954        ch_mut.meta.non_null_num = non_num;
3955        ch_mut.meta.non_null_bool = non_bool;
3956        ch_mut.meta.non_null_text = non_text;
3957        ch_mut.meta.non_null_err = non_err;
3958        freed
3959    }
3960
3961    /// Compact a dense chunk's computed overlay into its base arrays, freeing overlay memory
3962    /// while preserving the data. Returns the number of bytes freed.
3963    ///
3964    /// This is the computed-overlay counterpart of `maybe_compact_chunk` (which compacts
3965    /// user-edit overlays). The read cascade is `overlay → computed_overlay → base`, so
3966    /// folding computed overlay entries into base arrays is transparent: the `overlay` layer
3967    /// (user edits) is left untouched and still takes precedence on reads.
3968    pub fn compact_computed_overlay_chunk(&mut self, col_idx: usize, ch_idx: usize) -> usize {
3969        if col_idx >= self.columns.len() {
3970            return 0;
3971        }
3972
3973        let (len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err) = {
3974            let Some(ch_ref) = self.columns[col_idx].chunk(ch_idx) else {
3975                return 0;
3976            };
3977            let len = ch_ref.type_tag.len();
3978            if len == 0 || ch_ref.computed_overlay.is_empty() {
3979                return 0;
3980            }
3981
3982            let mut tag_b = UInt8Builder::with_capacity(len);
3983            let mut nb = Float64Builder::with_capacity(len);
3984            let mut bb = BooleanBuilder::with_capacity(len);
3985            let mut sb = StringBuilder::with_capacity(len, len * 8);
3986            let mut eb = UInt8Builder::with_capacity(len);
3987            let mut non_num = 0usize;
3988            let mut non_bool = 0usize;
3989            let mut non_text = 0usize;
3990            let mut non_err = 0usize;
3991
3992            for i in 0..len {
3993                if let Some(ov) = ch_ref.computed_overlay.get_scalar(i) {
3994                    let ov = ov.to_overlay_value();
3995                    append_overlay_value_to_lane_builders(
3996                        &ov,
3997                        &mut tag_b,
3998                        &mut nb,
3999                        &mut bb,
4000                        &mut sb,
4001                        &mut eb,
4002                        &mut non_num,
4003                        &mut non_bool,
4004                        &mut non_text,
4005                        &mut non_err,
4006                    );
4007                } else {
4008                    let tag = TypeTag::from_u8(ch_ref.type_tag.value(i));
4009                    match tag {
4010                        TypeTag::Empty => {
4011                            tag_b.append_value(TypeTag::Empty as u8);
4012                            nb.append_null();
4013                            bb.append_null();
4014                            sb.append_null();
4015                            eb.append_null();
4016                        }
4017                        TypeTag::Number | TypeTag::DateTime | TypeTag::Duration => {
4018                            tag_b.append_value(tag as u8);
4019                            if let Some(a) = &ch_ref.numbers {
4020                                let fa = a.as_any().downcast_ref::<Float64Array>().unwrap();
4021                                if fa.is_null(i) {
4022                                    nb.append_null();
4023                                } else {
4024                                    nb.append_value(fa.value(i));
4025                                    non_num += 1;
4026                                }
4027                            } else {
4028                                nb.append_null();
4029                            }
4030                            bb.append_null();
4031                            sb.append_null();
4032                            eb.append_null();
4033                        }
4034                        TypeTag::Boolean => {
4035                            tag_b.append_value(TypeTag::Boolean as u8);
4036                            nb.append_null();
4037                            if let Some(a) = &ch_ref.booleans {
4038                                let ba = a.as_any().downcast_ref::<BooleanArray>().unwrap();
4039                                if ba.is_null(i) {
4040                                    bb.append_null();
4041                                } else {
4042                                    bb.append_value(ba.value(i));
4043                                    non_bool += 1;
4044                                }
4045                            } else {
4046                                bb.append_null();
4047                            }
4048                            sb.append_null();
4049                            eb.append_null();
4050                        }
4051                        TypeTag::Text => {
4052                            tag_b.append_value(TypeTag::Text as u8);
4053                            nb.append_null();
4054                            bb.append_null();
4055                            if let Some(a) = &ch_ref.text {
4056                                let sa = a.as_any().downcast_ref::<StringArray>().unwrap();
4057                                if sa.is_null(i) {
4058                                    sb.append_null();
4059                                } else {
4060                                    sb.append_value(sa.value(i));
4061                                    non_text += 1;
4062                                }
4063                            } else {
4064                                sb.append_null();
4065                            }
4066                            eb.append_null();
4067                        }
4068                        TypeTag::Error => {
4069                            tag_b.append_value(TypeTag::Error as u8);
4070                            nb.append_null();
4071                            bb.append_null();
4072                            sb.append_null();
4073                            if let Some(a) = &ch_ref.errors {
4074                                let ea = a.as_any().downcast_ref::<UInt8Array>().unwrap();
4075                                if ea.is_null(i) {
4076                                    eb.append_null();
4077                                } else {
4078                                    eb.append_value(ea.value(i));
4079                                    non_err += 1;
4080                                }
4081                            } else {
4082                                eb.append_null();
4083                            }
4084                        }
4085                        TypeTag::Pending => {
4086                            tag_b.append_value(TypeTag::Pending as u8);
4087                            nb.append_null();
4088                            bb.append_null();
4089                            sb.append_null();
4090                            eb.append_null();
4091                        }
4092                    }
4093                }
4094            }
4095
4096            let tags = Arc::new(tag_b.finish());
4097            let numbers = {
4098                let a = nb.finish();
4099                if non_num == 0 {
4100                    None
4101                } else {
4102                    Some(Arc::new(a))
4103                }
4104            };
4105            let booleans = {
4106                let a = bb.finish();
4107                if non_bool == 0 {
4108                    None
4109                } else {
4110                    Some(Arc::new(a))
4111                }
4112            };
4113            let text = {
4114                let a = sb.finish();
4115                if non_text == 0 {
4116                    None
4117                } else {
4118                    Some(Arc::new(a) as ArrayRef)
4119                }
4120            };
4121            let errors = {
4122                let a = eb.finish();
4123                if non_err == 0 {
4124                    None
4125                } else {
4126                    Some(Arc::new(a))
4127                }
4128            };
4129
4130            (
4131                len, tags, numbers, booleans, text, errors, non_num, non_bool, non_text, non_err,
4132            )
4133        };
4134
4135        let Some(ch_mut) = self.columns[col_idx].chunk_mut(ch_idx) else {
4136            return 0;
4137        };
4138
4139        ch_mut.type_tag = tags;
4140        ch_mut.numbers = numbers;
4141        ch_mut.booleans = booleans;
4142        ch_mut.text = text;
4143        ch_mut.errors = errors;
4144        let freed = ch_mut.computed_overlay.clear();
4145        ch_mut.lowered_text = OnceCell::new();
4146        ch_mut.meta.len = len;
4147        ch_mut.meta.non_null_num = non_num;
4148        ch_mut.meta.non_null_bool = non_bool;
4149        ch_mut.meta.non_null_text = non_text;
4150        ch_mut.meta.non_null_err = non_err;
4151        freed
4152    }
4153
4154    /// Compact a sparse chunk's computed overlay into its base arrays.
4155    /// Equivalent to `compact_computed_overlay_chunk` but for sparse chunks.
4156    pub fn compact_computed_overlay_sparse_chunk(
4157        &mut self,
4158        col_idx: usize,
4159        ch_idx: usize,
4160    ) -> usize {
4161        // Sparse chunks are accessed via the same chunk/chunk_mut API,
4162        // so we delegate to the dense method which already handles both.
4163        self.compact_computed_overlay_chunk(col_idx, ch_idx)
4164    }
4165
4166    /// Insert `count` rows before absolute 0-based row `before`.
4167    pub fn insert_rows(&mut self, before: usize, count: usize) {
4168        if count == 0 {
4169            return;
4170        }
4171
4172        let total_rows = self.nrows as usize;
4173        if total_rows == 0 {
4174            self.nrows = count as u32;
4175            if self.nrows > 0 && self.chunk_starts.is_empty() {
4176                self.chunk_starts.push(0);
4177            }
4178            return;
4179        }
4180
4181        // Ensure a valid chunk map for non-empty sheets.
4182        if self.chunk_starts.is_empty() {
4183            self.chunk_starts.push(0);
4184        }
4185
4186        // "Dense" mode: every column has every chunk (legacy invariant).
4187        let dense_aligned = self
4188            .columns
4189            .iter()
4190            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4191
4192        let insert_at = before.min(total_rows);
4193        let (split_idx, split_off) = if insert_at == total_rows {
4194            // Append at end: split after last chunk.
4195            let last_idx = self.chunk_starts.len() - 1;
4196            let last_start = self.chunk_starts[last_idx];
4197            let last_len = total_rows.saturating_sub(last_start);
4198            (last_idx, last_len)
4199        } else {
4200            self.chunk_of_row(insert_at).unwrap_or((0, 0))
4201        };
4202
4203        if dense_aligned {
4204            // Rebuild chunks for each column (including inserted empty chunk) and recompute starts.
4205            for col in &mut self.columns {
4206                let mut new_chunks: Vec<ColumnChunk> = Vec::with_capacity(col.chunks.len() + 2);
4207                for i in 0..col.chunks.len() {
4208                    if i != split_idx {
4209                        new_chunks.push(col.chunks[i].clone());
4210                    } else {
4211                        let orig = &col.chunks[i];
4212                        let len = orig.type_tag.len();
4213                        if split_off > 0 {
4214                            new_chunks.push(Self::slice_chunk(orig, 0, split_off));
4215                        }
4216                        new_chunks.push(Self::make_empty_chunk(count));
4217                        if split_off < len {
4218                            new_chunks.push(Self::slice_chunk(orig, split_off, len - split_off));
4219                        }
4220                    }
4221                }
4222                col.chunks = new_chunks;
4223                col.sparse_chunks.clear();
4224            }
4225            self.nrows = (total_rows + count) as u32;
4226            self.recompute_chunk_starts();
4227            return;
4228        }
4229
4230        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
4231        #[derive(Clone, Copy)]
4232        enum PlanItem {
4233            Slice {
4234                old_idx: usize,
4235                off: usize,
4236                len: usize,
4237            },
4238            Empty {
4239                len: usize,
4240            },
4241        }
4242
4243        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len() + 2);
4244        for old_idx in 0..self.chunk_starts.len() {
4245            let ch_start = self.chunk_starts[old_idx];
4246            let ch_end = self
4247                .chunk_starts
4248                .get(old_idx + 1)
4249                .copied()
4250                .unwrap_or(total_rows);
4251            let ch_len = ch_end.saturating_sub(ch_start);
4252            if ch_len == 0 {
4253                continue;
4254            }
4255
4256            if old_idx != split_idx {
4257                plan.push(PlanItem::Slice {
4258                    old_idx,
4259                    off: 0,
4260                    len: ch_len,
4261                });
4262                continue;
4263            }
4264
4265            let left_len = split_off.min(ch_len);
4266            let right_len = ch_len.saturating_sub(left_len);
4267            if left_len > 0 {
4268                plan.push(PlanItem::Slice {
4269                    old_idx,
4270                    off: 0,
4271                    len: left_len,
4272                });
4273            }
4274            plan.push(PlanItem::Empty { len: count });
4275            if right_len > 0 {
4276                plan.push(PlanItem::Slice {
4277                    old_idx,
4278                    off: left_len,
4279                    len: right_len,
4280                });
4281            }
4282        }
4283
4284        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
4285        let mut cur = 0usize;
4286        for item in &plan {
4287            let len = match *item {
4288                PlanItem::Slice { len, .. } => len,
4289                PlanItem::Empty { len } => len,
4290            };
4291            if len == 0 {
4292                continue;
4293            }
4294            new_starts.push(cur);
4295            cur = cur.saturating_add(len);
4296        }
4297
4298        debug_assert_eq!(cur, total_rows.saturating_add(count));
4299
4300        // Update sheet row layout first.
4301        self.nrows = (total_rows + count) as u32;
4302        self.chunk_starts = new_starts;
4303
4304        // Rebuild stored chunks per column using the plan.
4305        for col in &mut self.columns {
4306            let old_dense = std::mem::take(&mut col.chunks);
4307            let old_sparse = std::mem::take(&mut col.sparse_chunks);
4308            let get_old = |idx: usize| -> Option<&ColumnChunk> {
4309                if idx < old_dense.len() {
4310                    Some(&old_dense[idx])
4311                } else {
4312                    old_sparse.get(&idx)
4313                }
4314            };
4315
4316            let mut dense: Vec<ColumnChunk> = Vec::new();
4317            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
4318            let mut dense_prefix = true;
4319
4320            for (new_idx, item) in plan.iter().enumerate() {
4321                let produced: Option<ColumnChunk> = match *item {
4322                    PlanItem::Empty { .. } => None,
4323                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
4324                        Some(orig) => {
4325                            if off == 0 && len == orig.type_tag.len() {
4326                                Some(orig.clone())
4327                            } else {
4328                                Some(Self::slice_chunk(orig, off, len))
4329                            }
4330                        }
4331                        None => None,
4332                    },
4333                };
4334
4335                if let Some(ch) = produced {
4336                    if dense_prefix && new_idx == dense.len() {
4337                        dense.push(ch);
4338                    } else {
4339                        sparse.insert(new_idx, ch);
4340                        dense_prefix = false;
4341                    }
4342                } else if dense_prefix && new_idx == dense.len() {
4343                    dense_prefix = false;
4344                }
4345            }
4346
4347            col.chunks = dense;
4348            col.sparse_chunks = sparse;
4349        }
4350    }
4351
4352    /// Delete `count` rows starting from absolute 0-based row `start`.
4353    pub fn delete_rows(&mut self, start: usize, count: usize) {
4354        if count == 0 || self.nrows == 0 {
4355            return;
4356        }
4357
4358        let total_rows = self.nrows as usize;
4359        if start >= total_rows {
4360            return;
4361        }
4362        let end = (start + count).min(total_rows);
4363        let del_len = end.saturating_sub(start);
4364        if del_len == 0 {
4365            return;
4366        }
4367
4368        // Ensure a valid chunk map for non-empty sheets.
4369        if total_rows > 0 && self.chunk_starts.is_empty() {
4370            self.chunk_starts.push(0);
4371        }
4372
4373        // "Dense" mode: every column has every chunk (legacy invariant).
4374        let dense_aligned = self
4375            .columns
4376            .iter()
4377            .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4378
4379        if dense_aligned {
4380            // Dense rebuild by slicing out the deleted window.
4381            for col in &mut self.columns {
4382                let mut new_chunks: Vec<ColumnChunk> = Vec::new();
4383                let mut cur_start = 0usize;
4384                for ch in &col.chunks {
4385                    let len = ch.type_tag.len();
4386                    let ch_end = cur_start + len;
4387                    // No overlap
4388                    if ch_end <= start || cur_start >= end {
4389                        new_chunks.push(ch.clone());
4390                    } else {
4391                        // Overlap exists
4392                        let del_start = start.max(cur_start);
4393                        let del_end = end.min(ch_end);
4394                        let left_len = del_start.saturating_sub(cur_start);
4395                        let right_len = ch_end.saturating_sub(del_end);
4396                        if left_len > 0 {
4397                            new_chunks.push(Self::slice_chunk(ch, 0, left_len));
4398                        }
4399                        if right_len > 0 {
4400                            let off = len - right_len;
4401                            new_chunks.push(Self::slice_chunk(ch, off, right_len));
4402                        }
4403                    }
4404                    cur_start = ch_end;
4405                }
4406                col.chunks = new_chunks;
4407                col.sparse_chunks.clear();
4408            }
4409            self.nrows = (total_rows - del_len) as u32;
4410            self.recompute_chunk_starts();
4411            return;
4412        }
4413
4414        // Sparse-aware mode: `chunk_starts` is authoritative and missing chunks are treated as empty.
4415        #[derive(Clone, Copy)]
4416        enum PlanItem {
4417            Slice {
4418                old_idx: usize,
4419                off: usize,
4420                len: usize,
4421            },
4422        }
4423
4424        let mut plan: Vec<PlanItem> = Vec::with_capacity(self.chunk_starts.len());
4425        for old_idx in 0..self.chunk_starts.len() {
4426            let ch_start = self.chunk_starts[old_idx];
4427            let ch_end = self
4428                .chunk_starts
4429                .get(old_idx + 1)
4430                .copied()
4431                .unwrap_or(total_rows);
4432            let ch_len = ch_end.saturating_sub(ch_start);
4433            if ch_len == 0 {
4434                continue;
4435            }
4436
4437            // No overlap
4438            if ch_end <= start || ch_start >= end {
4439                plan.push(PlanItem::Slice {
4440                    old_idx,
4441                    off: 0,
4442                    len: ch_len,
4443                });
4444                continue;
4445            }
4446
4447            // Left remainder
4448            if start > ch_start {
4449                let left_end = start.min(ch_end);
4450                let left_len = left_end.saturating_sub(ch_start);
4451                if left_len > 0 {
4452                    plan.push(PlanItem::Slice {
4453                        old_idx,
4454                        off: 0,
4455                        len: left_len,
4456                    });
4457                }
4458            }
4459
4460            // Right remainder
4461            if end < ch_end {
4462                let right_off = end.saturating_sub(ch_start);
4463                let right_len = ch_end.saturating_sub(end);
4464                if right_len > 0 {
4465                    plan.push(PlanItem::Slice {
4466                        old_idx,
4467                        off: right_off,
4468                        len: right_len,
4469                    });
4470                }
4471            }
4472        }
4473
4474        let mut new_starts: Vec<usize> = Vec::with_capacity(plan.len());
4475        let mut cur = 0usize;
4476        for item in &plan {
4477            let len = match *item {
4478                PlanItem::Slice { len, .. } => len,
4479            };
4480            if len == 0 {
4481                continue;
4482            }
4483            new_starts.push(cur);
4484            cur = cur.saturating_add(len);
4485        }
4486
4487        debug_assert_eq!(cur, total_rows.saturating_sub(del_len));
4488
4489        // Update sheet row layout first.
4490        self.nrows = (total_rows - del_len) as u32;
4491        self.chunk_starts = new_starts;
4492
4493        // Rebuild stored chunks per column using the plan.
4494        for col in &mut self.columns {
4495            let old_dense = std::mem::take(&mut col.chunks);
4496            let old_sparse = std::mem::take(&mut col.sparse_chunks);
4497            let get_old = |idx: usize| -> Option<&ColumnChunk> {
4498                if idx < old_dense.len() {
4499                    Some(&old_dense[idx])
4500                } else {
4501                    old_sparse.get(&idx)
4502                }
4503            };
4504
4505            let mut dense: Vec<ColumnChunk> = Vec::new();
4506            let mut sparse: FxHashMap<usize, ColumnChunk> = FxHashMap::default();
4507            let mut dense_prefix = true;
4508
4509            for (new_idx, item) in plan.iter().enumerate() {
4510                let produced: Option<ColumnChunk> = match *item {
4511                    PlanItem::Slice { old_idx, off, len } => match get_old(old_idx) {
4512                        Some(orig) => {
4513                            if off == 0 && len == orig.type_tag.len() {
4514                                Some(orig.clone())
4515                            } else {
4516                                Some(Self::slice_chunk(orig, off, len))
4517                            }
4518                        }
4519                        None => None,
4520                    },
4521                };
4522
4523                if let Some(ch) = produced {
4524                    if dense_prefix && new_idx == dense.len() {
4525                        dense.push(ch);
4526                    } else {
4527                        sparse.insert(new_idx, ch);
4528                        dense_prefix = false;
4529                    }
4530                } else if dense_prefix && new_idx == dense.len() {
4531                    dense_prefix = false;
4532                }
4533            }
4534
4535            col.chunks = dense;
4536            col.sparse_chunks = sparse;
4537        }
4538    }
4539
4540    /// Insert `count` columns before absolute 0-based column `before` with empty chunks.
4541    pub fn insert_columns(&mut self, before: usize, count: usize) {
4542        if count == 0 {
4543            return;
4544        }
4545        // Determine chunk schema from first column if present
4546        let empty_col = |lens: &[usize]| -> ArrowColumn {
4547            let mut chunks = Vec::with_capacity(lens.len());
4548            for &l in lens {
4549                chunks.push(Self::make_empty_chunk(l));
4550            }
4551            ArrowColumn {
4552                chunks,
4553                sparse_chunks: FxHashMap::default(),
4554                index: 0,
4555            }
4556        };
4557        let dense_aligned = !self.columns.is_empty()
4558            && self
4559                .columns
4560                .iter()
4561                .all(|c| c.sparse_chunks.is_empty() && c.chunks.len() == self.chunk_starts.len());
4562
4563        let lens: Vec<usize> = if dense_aligned {
4564            self.columns[0]
4565                .chunks
4566                .iter()
4567                .map(|c| c.type_tag.len())
4568                .collect()
4569        } else if self.columns.is_empty() {
4570            // No columns: single chunk matching nrows if any
4571            if self.nrows > 0 {
4572                vec![self.nrows as usize]
4573            } else {
4574                Vec::new()
4575            }
4576        } else {
4577            // Sparse sheet: keep inserted columns cheap by materializing no chunks.
4578            Vec::new()
4579        };
4580        let mut cols_new: Vec<ArrowColumn> = Vec::with_capacity(self.columns.len() + count);
4581        let before_idx = before.min(self.columns.len());
4582        for (i, col) in self.columns.iter_mut().enumerate() {
4583            if i == before_idx {
4584                for _ in 0..count {
4585                    cols_new.push(empty_col(&lens));
4586                }
4587            }
4588            cols_new.push(col.clone());
4589        }
4590        if before_idx == self.columns.len() {
4591            for _ in 0..count {
4592                cols_new.push(empty_col(&lens));
4593            }
4594        }
4595        // Fix column indices
4596        for (idx, col) in cols_new.iter_mut().enumerate() {
4597            col.index = idx as u32;
4598        }
4599        self.columns = cols_new;
4600        // chunk_starts unchanged; lens were matched
4601    }
4602
4603    /// Delete `count` columns starting at absolute 0-based column `start`.
4604    pub fn delete_columns(&mut self, start: usize, count: usize) {
4605        if count == 0 || self.columns.is_empty() {
4606            return;
4607        }
4608        let end = (start + count).min(self.columns.len());
4609        if start >= end {
4610            return;
4611        }
4612        self.columns.drain(start..end);
4613        for (idx, col) in self.columns.iter_mut().enumerate() {
4614            col.index = idx as u32;
4615        }
4616    }
4617}
4618
4619#[derive(Debug, Clone, Copy)]
4620pub struct ColumnShape {
4621    pub index: u32,
4622    pub chunks: usize,
4623    pub rows: usize,
4624    pub has_num: bool,
4625    pub has_bool: bool,
4626    pub has_text: bool,
4627    pub has_err: bool,
4628}
4629
4630#[cfg(test)]
4631mod tests {
4632    use super::*;
4633    use arrow_array::Array;
4634    use arrow_schema::DataType;
4635    use chrono::Datelike;
4636
4637    fn add_overlay_stats(into: &mut OverlayDebugStats, next: OverlayDebugStats) {
4638        into.points += next.points;
4639        into.sparse_fragments += next.sparse_fragments;
4640        into.dense_fragments += next.dense_fragments;
4641        into.run_fragments += next.run_fragments;
4642        into.covered_len += next.covered_len;
4643    }
4644
4645    fn column_overlay_stats(
4646        sheet: &ArrowSheet,
4647        col_idx: usize,
4648        computed: bool,
4649    ) -> OverlayDebugStats {
4650        let mut stats = OverlayDebugStats::default();
4651        let Some(column) = sheet.columns.get(col_idx) else {
4652            return stats;
4653        };
4654        for chunk in &column.chunks {
4655            add_overlay_stats(
4656                &mut stats,
4657                if computed {
4658                    chunk.computed_overlay.debug_stats()
4659                } else {
4660                    chunk.overlay.debug_stats()
4661                },
4662            );
4663        }
4664        for chunk in column.sparse_chunks.values() {
4665            add_overlay_stats(
4666                &mut stats,
4667                if computed {
4668                    chunk.computed_overlay.debug_stats()
4669                } else {
4670                    chunk.overlay.debug_stats()
4671                },
4672            );
4673        }
4674        stats
4675    }
4676
4677    fn assert_column_overlays_normalized(sheet: &ArrowSheet, col_idx: usize) {
4678        let column = &sheet.columns[col_idx];
4679        for chunk in &column.chunks {
4680            assert!(chunk.overlay.debug_is_normalized());
4681            assert!(chunk.computed_overlay.debug_is_normalized());
4682            assert_eq!(
4683                chunk.overlay.estimated_bytes(),
4684                chunk.overlay.debug_recomputed_estimated_bytes()
4685            );
4686            assert_eq!(
4687                chunk.computed_overlay.estimated_bytes(),
4688                chunk.computed_overlay.debug_recomputed_estimated_bytes()
4689            );
4690        }
4691        for chunk in column.sparse_chunks.values() {
4692            assert!(chunk.overlay.debug_is_normalized());
4693            assert!(chunk.computed_overlay.debug_is_normalized());
4694            assert_eq!(
4695                chunk.overlay.estimated_bytes(),
4696                chunk.overlay.debug_recomputed_estimated_bytes()
4697            );
4698            assert_eq!(
4699                chunk.computed_overlay.estimated_bytes(),
4700                chunk.computed_overlay.debug_recomputed_estimated_bytes()
4701            );
4702        }
4703    }
4704
4705    fn column_computed_overlay_estimated_bytes(sheet: &ArrowSheet, col_idx: usize) -> usize {
4706        let Some(column) = sheet.columns.get(col_idx) else {
4707            return 0;
4708        };
4709        column
4710            .chunks
4711            .iter()
4712            .map(|chunk| chunk.computed_overlay.estimated_bytes())
4713            .chain(
4714                column
4715                    .sparse_chunks
4716                    .values()
4717                    .map(|chunk| chunk.computed_overlay.estimated_bytes()),
4718            )
4719            .fold(0usize, usize::saturating_add)
4720    }
4721
4722    #[derive(Debug, Clone, Copy)]
4723    enum Phase4ProbeFixture {
4724        PointNumeric,
4725        DenseNumeric,
4726        RunNumeric,
4727        SparseNumeric,
4728        EmptyRun,
4729        MixedDense,
4730    }
4731
4732    impl Phase4ProbeFixture {
4733        fn name(self) -> &'static str {
4734            match self {
4735                Phase4ProbeFixture::PointNumeric => "point_numeric",
4736                Phase4ProbeFixture::DenseNumeric => "dense_numeric",
4737                Phase4ProbeFixture::RunNumeric => "run_numeric",
4738                Phase4ProbeFixture::SparseNumeric => "sparse_numeric",
4739                Phase4ProbeFixture::EmptyRun => "empty_run",
4740                Phase4ProbeFixture::MixedDense => "mixed_dense",
4741            }
4742        }
4743    }
4744
4745    #[derive(Debug, serde::Serialize)]
4746    struct Phase4ProbeOp {
4747        ms: f64,
4748        segments: usize,
4749        arrays: usize,
4750        rows_scanned: usize,
4751        checksum: f64,
4752        non_null: usize,
4753    }
4754
4755    #[derive(Debug, serde::Serialize)]
4756    struct Phase4ProbeRow {
4757        fixture: &'static str,
4758        rows: usize,
4759        points: usize,
4760        sparse_fragments: usize,
4761        dense_fragments: usize,
4762        run_fragments: usize,
4763        covered_len: usize,
4764        overlay_estimated_bytes: usize,
4765        numbers: Phase4ProbeOp,
4766        type_tags: Phase4ProbeOp,
4767        lowered_text: Phase4ProbeOp,
4768        get_cell_scan: Phase4ProbeOp,
4769        select_stats: OverlaySelectStats,
4770    }
4771
4772    fn build_phase4_probe_sheet(rows: usize, fixture: Phase4ProbeFixture) -> ArrowSheet {
4773        let mut builder =
4774            IngestBuilder::new("S", 1, rows.max(1), crate::engine::DateSystem::Excel1900);
4775        for row in 0..rows {
4776            builder
4777                .append_row(&[LiteralValue::Number((row + 1) as f64)])
4778                .unwrap();
4779        }
4780        let mut sheet = builder.finish();
4781        let chunk = sheet.columns[0].chunk_mut(0).unwrap();
4782        match fixture {
4783            Phase4ProbeFixture::PointNumeric => {
4784                for row in 0..rows {
4785                    chunk
4786                        .computed_overlay
4787                        .set_scalar(row, OverlayValue::Number((row + 1) as f64));
4788                }
4789            }
4790            Phase4ProbeFixture::DenseNumeric => {
4791                chunk.computed_overlay.apply_fragment(
4792                    OverlayFragment::dense_range(
4793                        0,
4794                        (0..rows)
4795                            .map(|row| OverlayValue::Number((row + 1) as f64))
4796                            .collect(),
4797                    )
4798                    .unwrap(),
4799                );
4800            }
4801            Phase4ProbeFixture::RunNumeric => {
4802                chunk.computed_overlay.apply_fragment(
4803                    OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); rows]).unwrap(),
4804                );
4805            }
4806            Phase4ProbeFixture::SparseNumeric => {
4807                chunk.computed_overlay.apply_fragment(
4808                    OverlayFragment::sparse_offsets(
4809                        (0..rows)
4810                            .step_by(10)
4811                            .map(|row| (row, OverlayValue::Number(10.0)))
4812                            .collect(),
4813                    )
4814                    .unwrap(),
4815                );
4816            }
4817            Phase4ProbeFixture::EmptyRun => {
4818                chunk.computed_overlay.apply_fragment(
4819                    OverlayFragment::run_range(0, vec![OverlayValue::Empty; rows]).unwrap(),
4820                );
4821            }
4822            Phase4ProbeFixture::MixedDense => {
4823                let pattern = [
4824                    OverlayValue::Number(1.0),
4825                    OverlayValue::Boolean(true),
4826                    OverlayValue::Text(Arc::from("Alpha")),
4827                    OverlayValue::Empty,
4828                    OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
4829                    OverlayValue::Pending,
4830                    OverlayValue::DateTime(45000.25),
4831                    OverlayValue::Duration(0.5),
4832                ];
4833                chunk.computed_overlay.apply_fragment(
4834                    OverlayFragment::dense_range(
4835                        0,
4836                        (0..rows)
4837                            .map(|row| pattern[row % pattern.len()].clone())
4838                            .collect(),
4839                    )
4840                    .unwrap(),
4841                );
4842            }
4843        }
4844        sheet
4845    }
4846
4847    fn measure_probe_numbers(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4848        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4849        let start = std::time::Instant::now();
4850        let mut segments = 0usize;
4851        let mut arrays = 0usize;
4852        let mut rows_scanned = 0usize;
4853        let mut checksum = 0.0;
4854        let mut non_null = 0usize;
4855        for segment in view.numbers_slices() {
4856            let (_row_start, row_len, cols) = segment.unwrap();
4857            segments += 1;
4858            rows_scanned += row_len;
4859            for array in cols {
4860                arrays += 1;
4861                for idx in 0..array.len() {
4862                    if array.is_valid(idx) {
4863                        checksum += array.value(idx);
4864                        non_null += 1;
4865                    }
4866                }
4867            }
4868        }
4869        Phase4ProbeOp {
4870            ms: start.elapsed().as_secs_f64() * 1000.0,
4871            segments,
4872            arrays,
4873            rows_scanned,
4874            checksum,
4875            non_null,
4876        }
4877    }
4878
4879    fn measure_probe_type_tags(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4880        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4881        let start = std::time::Instant::now();
4882        let mut segments = 0usize;
4883        let mut arrays = 0usize;
4884        let mut rows_scanned = 0usize;
4885        let mut checksum = 0.0;
4886        let mut non_null = 0usize;
4887        for segment in view.type_tags_slices() {
4888            let (_row_start, row_len, cols) = segment.unwrap();
4889            segments += 1;
4890            rows_scanned += row_len;
4891            for array in cols {
4892                arrays += 1;
4893                for idx in 0..array.len() {
4894                    if array.is_valid(idx) {
4895                        checksum += array.value(idx) as f64;
4896                        non_null += 1;
4897                    }
4898                }
4899            }
4900        }
4901        Phase4ProbeOp {
4902            ms: start.elapsed().as_secs_f64() * 1000.0,
4903            segments,
4904            arrays,
4905            rows_scanned,
4906            checksum,
4907            non_null,
4908        }
4909    }
4910
4911    fn measure_probe_lowered_text(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4912        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4913        let start = std::time::Instant::now();
4914        let mut segments = 0usize;
4915        let mut arrays = 0usize;
4916        let mut rows_scanned = 0usize;
4917        let mut checksum = 0.0;
4918        let mut non_null = 0usize;
4919        for segment in view.lowered_text_slices() {
4920            let (_row_start, row_len, cols) = segment.unwrap();
4921            segments += 1;
4922            rows_scanned += row_len;
4923            for array in cols {
4924                arrays += 1;
4925                for idx in 0..array.len() {
4926                    if array.is_valid(idx) {
4927                        checksum += array.value(idx).len() as f64;
4928                        non_null += 1;
4929                    }
4930                }
4931            }
4932        }
4933        Phase4ProbeOp {
4934            ms: start.elapsed().as_secs_f64() * 1000.0,
4935            segments,
4936            arrays,
4937            rows_scanned,
4938            checksum,
4939            non_null,
4940        }
4941    }
4942
4943    fn literal_probe_weight(value: LiteralValue) -> f64 {
4944        match value {
4945            LiteralValue::Empty => 0.0,
4946            LiteralValue::Int(value) => value as f64,
4947            LiteralValue::Number(value) => value,
4948            LiteralValue::Boolean(value) => {
4949                if value {
4950                    1.0
4951                } else {
4952                    0.0
4953                }
4954            }
4955            LiteralValue::Text(value) => value.len() as f64,
4956            LiteralValue::Error(_) => -1.0,
4957            LiteralValue::Date(value) => value.num_days_from_ce() as f64,
4958            LiteralValue::DateTime(value) => value.and_utc().timestamp() as f64,
4959            LiteralValue::Time(value) => value.num_seconds_from_midnight() as f64,
4960            LiteralValue::Duration(value) => value.num_seconds() as f64,
4961            LiteralValue::Array(values) => values.len() as f64,
4962            LiteralValue::Pending => -2.0,
4963        }
4964    }
4965
4966    fn measure_probe_get_cell(sheet: &ArrowSheet, rows: usize) -> Phase4ProbeOp {
4967        let view = sheet.range_view(0, 0, rows.saturating_sub(1), 0);
4968        let start = std::time::Instant::now();
4969        let mut checksum = 0.0;
4970        for row in 0..rows {
4971            checksum += literal_probe_weight(view.get_cell(row, 0));
4972        }
4973        Phase4ProbeOp {
4974            ms: start.elapsed().as_secs_f64() * 1000.0,
4975            segments: 1,
4976            arrays: 0,
4977            rows_scanned: rows,
4978            checksum,
4979            non_null: rows,
4980        }
4981    }
4982
4983    fn run_phase4_probe_fixture(rows: usize, fixture: Phase4ProbeFixture) -> Phase4ProbeRow {
4984        let sheet = build_phase4_probe_sheet(rows, fixture);
4985        assert_column_overlays_normalized(&sheet, 0);
4986        let stats = column_overlay_stats(&sheet, 0, true);
4987        reset_overlay_select_stats();
4988        let numbers = measure_probe_numbers(&sheet, rows);
4989        let type_tags = measure_probe_type_tags(&sheet, rows);
4990        let lowered_text = measure_probe_lowered_text(&sheet, rows);
4991        let select_stats = snapshot_overlay_select_stats();
4992        let get_cell_scan = measure_probe_get_cell(&sheet, rows);
4993        Phase4ProbeRow {
4994            fixture: fixture.name(),
4995            rows,
4996            points: stats.points,
4997            sparse_fragments: stats.sparse_fragments,
4998            dense_fragments: stats.dense_fragments,
4999            run_fragments: stats.run_fragments,
5000            covered_len: stats.covered_len,
5001            overlay_estimated_bytes: column_computed_overlay_estimated_bytes(&sheet, 0),
5002            numbers,
5003            type_tags,
5004            lowered_text,
5005            get_cell_scan,
5006            select_stats,
5007        }
5008    }
5009
5010    #[test]
5011    #[ignore = "manual Phase 4 observability probe; run with --ignored --nocapture"]
5012    fn phase4_overlay_rangeview_observability_probe() {
5013        let rows = std::env::var("FORMUALIZER_OVERLAY_PROBE_ROWS")
5014            .ok()
5015            .and_then(|value| value.parse::<usize>().ok())
5016            .unwrap_or(100_000)
5017            .max(1);
5018        for fixture in [
5019            Phase4ProbeFixture::PointNumeric,
5020            Phase4ProbeFixture::DenseNumeric,
5021            Phase4ProbeFixture::RunNumeric,
5022            Phase4ProbeFixture::SparseNumeric,
5023            Phase4ProbeFixture::EmptyRun,
5024            Phase4ProbeFixture::MixedDense,
5025        ] {
5026            let row = run_phase4_probe_fixture(rows, fixture);
5027            println!("{}", serde_json::to_string(&row).unwrap());
5028        }
5029    }
5030
5031    #[test]
5032    fn ingest_mixed_rows_into_lanes_and_tags() {
5033        let mut b = IngestBuilder::new("Sheet1", 1, 1024, crate::engine::DateSystem::Excel1900);
5034        let data = vec![
5035            LiteralValue::Number(42.5),                   // Number
5036            LiteralValue::Empty,                          // Empty
5037            LiteralValue::Text(String::new()),            // Empty text (Text lane)
5038            LiteralValue::Boolean(true),                  // Boolean
5039            LiteralValue::Error(ExcelError::new_value()), // Error
5040        ];
5041        for v in &data {
5042            b.append_row(std::slice::from_ref(v)).unwrap();
5043        }
5044        let sheet = b.finish();
5045        assert_eq!(sheet.nrows, 5);
5046        assert_eq!(sheet.columns.len(), 1);
5047        assert_eq!(sheet.columns[0].chunks.len(), 1);
5048        let ch = &sheet.columns[0].chunks[0];
5049
5050        // Type tags
5051        let tags = ch.type_tag.values();
5052        assert_eq!(tags.len(), 5);
5053        assert_eq!(tags[0], TypeTag::Number as u8);
5054        assert_eq!(tags[1], TypeTag::Empty as u8);
5055        assert_eq!(tags[2], TypeTag::Text as u8);
5056        assert_eq!(tags[3], TypeTag::Boolean as u8);
5057        assert_eq!(tags[4], TypeTag::Error as u8);
5058
5059        // Numbers lane validity
5060        let nums = ch.numbers.as_ref().unwrap();
5061        assert_eq!(nums.len(), 5);
5062        assert_eq!(nums.null_count(), 4);
5063        assert!(nums.is_valid(0));
5064
5065        // Booleans lane validity
5066        let bools = ch.booleans.as_ref().unwrap();
5067        assert_eq!(bools.len(), 5);
5068        assert_eq!(bools.null_count(), 4);
5069        assert!(bools.is_valid(3));
5070
5071        // Text lane validity
5072        let txt = ch.text.as_ref().unwrap();
5073        assert_eq!(txt.len(), 5);
5074        assert_eq!(txt.null_count(), 4);
5075        assert!(txt.is_valid(2)); // ""
5076
5077        // Errors lane
5078        let errs = ch.errors.as_ref().unwrap();
5079        assert_eq!(errs.len(), 5);
5080        assert_eq!(errs.null_count(), 4);
5081        assert!(errs.is_valid(4));
5082    }
5083
5084    #[test]
5085    fn range_view_get_cell_and_padding() {
5086        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
5087        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("".into())])
5088            .unwrap();
5089        b.append_row(&[LiteralValue::Empty, LiteralValue::Text("x".into())])
5090            .unwrap();
5091        b.append_row(&[LiteralValue::Boolean(true), LiteralValue::Empty])
5092            .unwrap();
5093        let sheet = b.finish();
5094        let rv = sheet.range_view(0, 0, 2, 1);
5095        assert_eq!(rv.dims(), (3, 2));
5096        // Inside
5097        assert_eq!(rv.get_cell(0, 0), LiteralValue::Number(1.0));
5098        assert_eq!(rv.get_cell(0, 1), LiteralValue::Text(String::new())); // empty string
5099        assert_eq!(rv.get_cell(1, 0), LiteralValue::Empty); // truly Empty
5100        assert_eq!(rv.get_cell(2, 0), LiteralValue::Boolean(true));
5101        // OOB padding
5102        assert_eq!(rv.get_cell(3, 0), LiteralValue::Empty);
5103        assert_eq!(rv.get_cell(0, 2), LiteralValue::Empty);
5104
5105        // Numbers slices should produce one 2-row and one 1-row segment
5106        let nums: Vec<_> = rv.numbers_slices().map(|r| r.unwrap()).collect();
5107        assert_eq!(nums.len(), 2);
5108        assert_eq!(nums[0].0, 0);
5109        assert_eq!(nums[0].1, 2);
5110        assert_eq!(nums[1].0, 2);
5111        assert_eq!(nums[1].1, 1);
5112    }
5113
5114    #[test]
5115    fn overlay_precedence_user_over_computed() {
5116        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
5117        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
5118        b.append_row(&[LiteralValue::Empty]).unwrap();
5119        b.append_row(&[LiteralValue::Empty]).unwrap();
5120        let mut sheet = b.finish();
5121
5122        let (ch_i, off) = sheet.chunk_of_row(0).unwrap();
5123        sheet.columns[0].chunks[ch_i]
5124            .computed_overlay
5125            .set(off, OverlayValue::Number(2.0));
5126
5127        let rv0 = sheet.range_view(0, 0, 0, 0);
5128        assert_eq!(rv0.get_cell(0, 0), LiteralValue::Number(2.0));
5129        let nums0: Vec<_> = rv0.numbers_slices().map(|r| r.unwrap()).collect();
5130        assert_eq!(nums0.len(), 1);
5131        assert_eq!(nums0[0].2[0].value(0), 2.0);
5132
5133        sheet.columns[0].chunks[ch_i]
5134            .overlay
5135            .set(off, OverlayValue::Number(3.0));
5136
5137        let rv1 = sheet.range_view(0, 0, 0, 0);
5138        assert_eq!(rv1.get_cell(0, 0), LiteralValue::Number(3.0));
5139        let nums1: Vec<_> = rv1.numbers_slices().map(|r| r.unwrap()).collect();
5140        assert_eq!(nums1.len(), 1);
5141        assert_eq!(nums1[0].2[0].value(0), 3.0);
5142    }
5143
5144    #[test]
5145    fn overlay_slice_preserves_explicit_empty_and_offsets() {
5146        let mut overlay = Overlay::new();
5147        overlay.set(2, OverlayValue::Number(2.0));
5148        overlay.set(4, OverlayValue::Empty);
5149        overlay.set(6, OverlayValue::Text(Arc::from("outside")));
5150
5151        let sliced = overlay.slice(1, 4);
5152        assert!(sliced.get_scalar(0).is_none());
5153        assert_eq!(
5154            sliced.get_scalar(1).unwrap().to_literal(),
5155            LiteralValue::Number(2.0)
5156        );
5157        assert_eq!(
5158            sliced.get_scalar(3).unwrap().to_literal(),
5159            LiteralValue::Empty
5160        );
5161        assert!(sliced.get_scalar(5).is_none());
5162    }
5163
5164    #[test]
5165    fn overlay_cascade_user_empty_masks_computed_and_base() {
5166        let mut user = Overlay::new();
5167        let mut computed = Overlay::new();
5168        computed.set(1, OverlayValue::Number(42.0));
5169        user.set(1, OverlayValue::Empty);
5170
5171        let cascade = OverlayCascade::new(&user, &computed);
5172        assert_eq!(
5173            cascade.get_scalar(1).unwrap().to_literal(),
5174            LiteralValue::Empty
5175        );
5176        assert!(cascade.has_any_in_range(1..2));
5177    }
5178
5179    #[test]
5180    fn overlay_storage_pointmap_backward_compat_get_set_remove() {
5181        let mut overlay = Overlay::new();
5182        assert!(overlay.is_empty());
5183
5184        let delta = overlay.set_scalar(1, OverlayValue::Number(10.0));
5185        assert!(delta > 0);
5186        assert_eq!(overlay.len(), 1);
5187        assert_eq!(
5188            overlay.get_scalar(1).unwrap().to_literal(),
5189            LiteralValue::Number(10.0)
5190        );
5191
5192        let replace_delta = overlay.set_scalar(1, OverlayValue::Text(Arc::from("x")));
5193        assert_ne!(replace_delta, 0);
5194        assert_eq!(overlay.len(), 1);
5195        assert_eq!(
5196            overlay.get_scalar(1).unwrap().to_literal(),
5197            LiteralValue::Text("x".into())
5198        );
5199
5200        let remove_delta = overlay.remove_scalar(1);
5201        assert!(remove_delta < 0);
5202        assert!(overlay.is_empty());
5203        assert!(overlay.get_scalar(1).is_none());
5204    }
5205
5206    #[test]
5207    fn overlay_remove_range_splits_fragments_and_points() {
5208        let mut overlay = Overlay::new();
5209        overlay.set_scalar(2, OverlayValue::Number(20.0));
5210        overlay.apply_fragment(
5211            OverlayFragment::dense_range(
5212                0,
5213                (0..6)
5214                    .map(|i| OverlayValue::Number(i as f64))
5215                    .collect::<Vec<_>>(),
5216            )
5217            .unwrap(),
5218        );
5219        overlay.set_scalar(3, OverlayValue::Number(30.0));
5220        overlay.set_scalar(8, OverlayValue::Number(80.0));
5221
5222        let delta = overlay.remove_range(2..5);
5223
5224        assert!(delta < 0);
5225        assert_eq!(
5226            overlay.get_scalar(0).unwrap().to_literal(),
5227            LiteralValue::Number(0.0)
5228        );
5229        assert_eq!(
5230            overlay.get_scalar(1).unwrap().to_literal(),
5231            LiteralValue::Number(1.0)
5232        );
5233        assert!(overlay.get_scalar(2).is_none());
5234        assert!(overlay.get_scalar(3).is_none());
5235        assert!(overlay.get_scalar(4).is_none());
5236        assert_eq!(
5237            overlay.get_scalar(5).unwrap().to_literal(),
5238            LiteralValue::Number(5.0)
5239        );
5240        assert_eq!(
5241            overlay.get_scalar(8).unwrap().to_literal(),
5242            LiteralValue::Number(80.0)
5243        );
5244        assert!(overlay.debug_is_normalized());
5245        assert_eq!(
5246            overlay.estimated_bytes(),
5247            overlay.debug_recomputed_estimated_bytes()
5248        );
5249    }
5250
5251    #[test]
5252    fn overlay_storage_no_fragments_behavior_matches_old_map() {
5253        let mut overlay = Overlay::new();
5254        overlay.set_scalar(0, OverlayValue::Number(1.0));
5255        overlay.set_scalar(3, OverlayValue::Empty);
5256
5257        assert!(overlay.has_any_in_range(0..1));
5258        assert!(!overlay.has_any_in_range(1..3));
5259        assert!(overlay.has_any_in_range(3..4));
5260
5261        let sliced = overlay.slice(2, 3);
5262        assert!(sliced.get_scalar(0).is_none());
5263        assert_eq!(
5264            sliced.get_scalar(1).unwrap().to_literal(),
5265            LiteralValue::Empty
5266        );
5267    }
5268
5269    #[test]
5270    fn overlay_cascade_user_layer_masks_computed_fragment_regardless_of_sequence() {
5271        let mut user = Overlay::new();
5272        let mut computed = Overlay::new();
5273
5274        user.set_scalar(0, OverlayValue::Number(3.0));
5275        computed.apply_fragment(
5276            OverlayFragment::dense_range(0, vec![OverlayValue::Number(2.0)]).unwrap(),
5277        );
5278
5279        let cascade = OverlayCascade::new(&user, &computed);
5280        assert_eq!(
5281            cascade.get_scalar(0).unwrap().to_literal(),
5282            LiteralValue::Number(3.0)
5283        );
5284    }
5285
5286    #[test]
5287    fn overlay_same_layer_later_point_replaces_fragment_cell() {
5288        let mut overlay = Overlay::new();
5289        overlay.apply_fragment(
5290            OverlayFragment::dense_range(
5291                0,
5292                vec![
5293                    OverlayValue::Number(1.0),
5294                    OverlayValue::Number(2.0),
5295                    OverlayValue::Number(3.0),
5296                ],
5297            )
5298            .unwrap(),
5299        );
5300
5301        overlay.set_scalar(1, OverlayValue::Number(99.0));
5302
5303        assert_eq!(
5304            overlay.get_scalar(0).unwrap().to_literal(),
5305            LiteralValue::Number(1.0)
5306        );
5307        assert_eq!(
5308            overlay.get_scalar(1).unwrap().to_literal(),
5309            LiteralValue::Number(99.0)
5310        );
5311        assert_eq!(
5312            overlay.get_scalar(2).unwrap().to_literal(),
5313            LiteralValue::Number(3.0)
5314        );
5315    }
5316
5317    #[test]
5318    fn overlay_same_layer_later_fragment_replaces_point_range() {
5319        let mut overlay = Overlay::new();
5320        overlay.set_scalar(0, OverlayValue::Number(1.0));
5321        overlay.set_scalar(1, OverlayValue::Number(2.0));
5322        overlay.set_scalar(2, OverlayValue::Number(3.0));
5323
5324        overlay.apply_fragment(
5325            OverlayFragment::dense_range(
5326                0,
5327                vec![
5328                    OverlayValue::Number(10.0),
5329                    OverlayValue::Number(20.0),
5330                    OverlayValue::Number(30.0),
5331                ],
5332            )
5333            .unwrap(),
5334        );
5335
5336        let stats = overlay.debug_stats();
5337        assert_eq!(stats.points, 0);
5338        assert_eq!(stats.dense_fragments, 1);
5339        assert!(overlay.debug_is_normalized());
5340        assert_eq!(
5341            overlay.get_scalar(0).unwrap().to_literal(),
5342            LiteralValue::Number(10.0)
5343        );
5344        assert_eq!(
5345            overlay.get_scalar(1).unwrap().to_literal(),
5346            LiteralValue::Number(20.0)
5347        );
5348        assert_eq!(
5349            overlay.get_scalar(2).unwrap().to_literal(),
5350            LiteralValue::Number(30.0)
5351        );
5352    }
5353
5354    #[test]
5355    fn overlay_sparse_far_apart_replacement_does_not_rewrite_unrelated_dense_fragment() {
5356        let mut overlay = Overlay::new();
5357        overlay.apply_fragment(
5358            OverlayFragment::dense_range(100, vec![OverlayValue::Number(1.0); 10]).unwrap(),
5359        );
5360
5361        overlay.apply_fragment(
5362            OverlayFragment::sparse_offsets(vec![
5363                (0, OverlayValue::Empty),
5364                (1000, OverlayValue::Number(1000.0)),
5365            ])
5366            .unwrap(),
5367        );
5368
5369        let stats = overlay.debug_stats();
5370        assert_eq!(stats.dense_fragments, 1);
5371        assert_eq!(stats.sparse_fragments, 1);
5372        assert_eq!(stats.run_fragments, 0);
5373        assert!(overlay.debug_is_normalized());
5374        assert_eq!(
5375            overlay.get_scalar(105).unwrap().to_literal(),
5376            LiteralValue::Number(1.0)
5377        );
5378        assert_eq!(
5379            overlay.get_scalar(0).unwrap().to_literal(),
5380            LiteralValue::Empty
5381        );
5382        assert_eq!(
5383            overlay.get_scalar(1000).unwrap().to_literal(),
5384            LiteralValue::Number(1000.0)
5385        );
5386    }
5387
5388    #[test]
5389    fn overlay_sparse_offsets_are_sorted_unique_last_write_wins() {
5390        let mut overlay = Overlay::new();
5391        overlay.apply_fragment(
5392            OverlayFragment::sparse_offsets(vec![
5393                (3, OverlayValue::Number(3.0)),
5394                (1, OverlayValue::Number(1.0)),
5395                (3, OverlayValue::Number(33.0)),
5396            ])
5397            .unwrap(),
5398        );
5399
5400        let stats = overlay.debug_stats();
5401        assert_eq!(stats.sparse_fragments, 1);
5402        assert_eq!(overlay.len(), 2);
5403        assert_eq!(
5404            overlay.get_scalar(1).unwrap().to_literal(),
5405            LiteralValue::Number(1.0)
5406        );
5407        assert_eq!(
5408            overlay.get_scalar(3).unwrap().to_literal(),
5409            LiteralValue::Number(33.0)
5410        );
5411        assert!(overlay.debug_is_normalized());
5412    }
5413
5414    #[test]
5415    fn overlay_dense_point_replacement_splits_dense_not_sparse() {
5416        let mut overlay = Overlay::new();
5417        overlay.apply_fragment(
5418            OverlayFragment::dense_range(
5419                0,
5420                (0..6)
5421                    .map(|i| OverlayValue::Number(i as f64))
5422                    .collect::<Vec<_>>(),
5423            )
5424            .unwrap(),
5425        );
5426
5427        overlay.set_scalar(3, OverlayValue::Number(99.0));
5428
5429        let stats = overlay.debug_stats();
5430        assert_eq!(stats.points, 1);
5431        assert_eq!(stats.dense_fragments, 2);
5432        assert_eq!(stats.sparse_fragments, 0);
5433        assert!(overlay.debug_is_normalized());
5434        assert_eq!(
5435            overlay.get_scalar(2).unwrap().to_literal(),
5436            LiteralValue::Number(2.0)
5437        );
5438        assert_eq!(
5439            overlay.get_scalar(3).unwrap().to_literal(),
5440            LiteralValue::Number(99.0)
5441        );
5442        assert_eq!(
5443            overlay.get_scalar(4).unwrap().to_literal(),
5444            LiteralValue::Number(4.0)
5445        );
5446    }
5447
5448    #[test]
5449    fn overlay_dense_fragment_replacement_splits_left_and_right_dense() {
5450        let mut overlay = Overlay::new();
5451        overlay.apply_fragment(
5452            OverlayFragment::dense_range(
5453                0,
5454                (0..8)
5455                    .map(|i| OverlayValue::Number(i as f64))
5456                    .collect::<Vec<_>>(),
5457            )
5458            .unwrap(),
5459        );
5460
5461        overlay.apply_fragment(
5462            OverlayFragment::dense_range(
5463                3,
5464                vec![OverlayValue::Number(30.0), OverlayValue::Number(40.0)],
5465            )
5466            .unwrap(),
5467        );
5468
5469        let stats = overlay.debug_stats();
5470        assert_eq!(stats.points, 0);
5471        assert_eq!(stats.dense_fragments, 3);
5472        assert_eq!(stats.sparse_fragments, 0);
5473        assert!(overlay.debug_is_normalized());
5474        assert_eq!(
5475            overlay.get_scalar(2).unwrap().to_literal(),
5476            LiteralValue::Number(2.0)
5477        );
5478        assert_eq!(
5479            overlay.get_scalar(3).unwrap().to_literal(),
5480            LiteralValue::Number(30.0)
5481        );
5482        assert_eq!(
5483            overlay.get_scalar(4).unwrap().to_literal(),
5484            LiteralValue::Number(40.0)
5485        );
5486        assert_eq!(
5487            overlay.get_scalar(5).unwrap().to_literal(),
5488            LiteralValue::Number(5.0)
5489        );
5490    }
5491
5492    #[test]
5493    fn overlay_run_point_replacement_splits_run_not_sparse() {
5494        let mut overlay = Overlay::new();
5495        overlay.apply_fragment(
5496            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 10]).unwrap(),
5497        );
5498
5499        overlay.set_scalar(5, OverlayValue::Number(99.0));
5500
5501        let stats = overlay.debug_stats();
5502        assert_eq!(stats.points, 1);
5503        assert_eq!(stats.run_fragments, 2);
5504        assert_eq!(stats.sparse_fragments, 0);
5505        assert!(overlay.debug_is_normalized());
5506        assert_eq!(
5507            overlay.get_scalar(4).unwrap().to_literal(),
5508            LiteralValue::Number(1.0)
5509        );
5510        assert_eq!(
5511            overlay.get_scalar(5).unwrap().to_literal(),
5512            LiteralValue::Number(99.0)
5513        );
5514        assert_eq!(
5515            overlay.get_scalar(6).unwrap().to_literal(),
5516            LiteralValue::Number(1.0)
5517        );
5518    }
5519
5520    #[test]
5521    fn overlay_run_fragment_replacement_splits_left_and_right_run() {
5522        let mut overlay = Overlay::new();
5523        let values = [
5524            vec![OverlayValue::Number(1.0); 4],
5525            vec![OverlayValue::Number(2.0); 4],
5526            vec![OverlayValue::Number(3.0); 4],
5527        ]
5528        .concat();
5529        overlay.apply_fragment(OverlayFragment::run_range(0, values).unwrap());
5530
5531        overlay.apply_fragment(
5532            OverlayFragment::dense_range(
5533                5,
5534                vec![OverlayValue::Number(50.0), OverlayValue::Number(60.0)],
5535            )
5536            .unwrap(),
5537        );
5538
5539        let stats = overlay.debug_stats();
5540        assert_eq!(stats.run_fragments, 2);
5541        assert_eq!(stats.dense_fragments, 1);
5542        assert_eq!(stats.sparse_fragments, 0);
5543        assert!(overlay.debug_is_normalized());
5544        assert_eq!(
5545            overlay.get_scalar(4).unwrap().to_literal(),
5546            LiteralValue::Number(2.0)
5547        );
5548        assert_eq!(
5549            overlay.get_scalar(5).unwrap().to_literal(),
5550            LiteralValue::Number(50.0)
5551        );
5552        assert_eq!(
5553            overlay.get_scalar(6).unwrap().to_literal(),
5554            LiteralValue::Number(60.0)
5555        );
5556        assert_eq!(
5557            overlay.get_scalar(7).unwrap().to_literal(),
5558            LiteralValue::Number(2.0)
5559        );
5560    }
5561
5562    #[test]
5563    fn overlay_slice_preserves_dense_and_run_encodings() {
5564        let mut overlay = Overlay::new();
5565        overlay.apply_fragment(
5566            OverlayFragment::dense_range(
5567                10,
5568                (0..5)
5569                    .map(|i| OverlayValue::Number(i as f64))
5570                    .collect::<Vec<_>>(),
5571            )
5572            .unwrap(),
5573        );
5574        overlay.apply_fragment(
5575            OverlayFragment::run_range(
5576                20,
5577                [
5578                    vec![OverlayValue::Number(1.0); 3],
5579                    vec![OverlayValue::Number(2.0); 3],
5580                ]
5581                .concat(),
5582            )
5583            .unwrap(),
5584        );
5585
5586        let dense_slice = overlay.slice(12, 2);
5587        let dense_stats = dense_slice.debug_stats();
5588        assert_eq!(dense_stats.dense_fragments, 1);
5589        assert_eq!(dense_stats.sparse_fragments, 0);
5590        assert_eq!(
5591            dense_slice.get_scalar(0).unwrap().to_literal(),
5592            LiteralValue::Number(2.0)
5593        );
5594        assert_eq!(
5595            dense_slice.get_scalar(1).unwrap().to_literal(),
5596            LiteralValue::Number(3.0)
5597        );
5598        assert!(dense_slice.debug_is_normalized());
5599
5600        let run_slice = overlay.slice(22, 3);
5601        let run_stats = run_slice.debug_stats();
5602        assert_eq!(run_stats.run_fragments, 1);
5603        assert_eq!(run_stats.sparse_fragments, 0);
5604        assert_eq!(
5605            run_slice.get_scalar(0).unwrap().to_literal(),
5606            LiteralValue::Number(1.0)
5607        );
5608        assert_eq!(
5609            run_slice.get_scalar(1).unwrap().to_literal(),
5610            LiteralValue::Number(2.0)
5611        );
5612        assert_eq!(
5613            run_slice.get_scalar(2).unwrap().to_literal(),
5614            LiteralValue::Number(2.0)
5615        );
5616        assert!(run_slice.debug_is_normalized());
5617    }
5618
5619    #[test]
5620    fn overlay_computed_empty_run_masks_non_empty_base() {
5621        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
5622        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
5623        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
5624        b.append_row(&[LiteralValue::Number(3.0)]).unwrap();
5625        let mut sheet = b.finish();
5626
5627        let (ch_i, _) = sheet.chunk_of_row(0).unwrap();
5628        sheet.columns[0].chunks[ch_i]
5629            .computed_overlay
5630            .apply_fragment(
5631                OverlayFragment::run_range(
5632                    0,
5633                    vec![
5634                        OverlayValue::Empty,
5635                        OverlayValue::Empty,
5636                        OverlayValue::Empty,
5637                    ],
5638                )
5639                .unwrap(),
5640            );
5641
5642        assert_eq!(sheet.get_cell_value(0, 0), LiteralValue::Empty);
5643        assert_eq!(sheet.get_cell_value(1, 0), LiteralValue::Empty);
5644        assert_eq!(sheet.get_cell_value(2, 0), LiteralValue::Empty);
5645    }
5646
5647    #[test]
5648    fn overlay_fragments_reconstruct_scalars_from_typed_lanes() {
5649        let values = vec![
5650            OverlayValue::Empty,
5651            OverlayValue::Number(1.5),
5652            OverlayValue::DateTime(45000.25),
5653            OverlayValue::Duration(0.5),
5654            OverlayValue::Boolean(true),
5655            OverlayValue::Text(Arc::from("Hello")),
5656            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
5657            OverlayValue::Pending,
5658        ];
5659
5660        let mut dense = Overlay::new();
5661        dense.apply_fragment(OverlayFragment::dense_range(0, values.clone()).unwrap());
5662        for (idx, expected) in values.iter().enumerate() {
5663            assert_eq!(
5664                dense.get_scalar(idx).unwrap().to_overlay_value(),
5665                expected.clone()
5666            );
5667        }
5668
5669        let mut sparse = Overlay::new();
5670        sparse.apply_fragment(
5671            OverlayFragment::sparse_offsets(
5672                values
5673                    .iter()
5674                    .cloned()
5675                    .enumerate()
5676                    .map(|(idx, value)| (idx * 2, value))
5677                    .collect(),
5678            )
5679            .unwrap(),
5680        );
5681        for (idx, expected) in values.iter().enumerate() {
5682            assert_eq!(
5683                sparse.get_scalar(idx * 2).unwrap().to_overlay_value(),
5684                expected.clone()
5685            );
5686        }
5687
5688        let mut run = Overlay::new();
5689        run.apply_fragment(
5690            OverlayFragment::run_range(
5691                0,
5692                vec![
5693                    OverlayValue::Number(7.0),
5694                    OverlayValue::Number(7.0),
5695                    OverlayValue::Text(Arc::from("run")),
5696                    OverlayValue::Text(Arc::from("run")),
5697                ],
5698            )
5699            .unwrap(),
5700        );
5701        assert_eq!(
5702            run.get_scalar(0).unwrap().to_overlay_value(),
5703            OverlayValue::Number(7.0)
5704        );
5705        assert_eq!(
5706            run.get_scalar(2).unwrap().to_overlay_value(),
5707            OverlayValue::Text(Arc::from("run"))
5708        );
5709    }
5710
5711    #[test]
5712    fn overlay_iter_returns_complete_logical_entries() {
5713        let mut overlay = Overlay::new();
5714        overlay.apply_fragment(
5715            OverlayFragment::dense_range(
5716                2,
5717                vec![OverlayValue::Number(2.0), OverlayValue::Number(3.0)],
5718            )
5719            .unwrap(),
5720        );
5721        overlay.set_scalar(5, OverlayValue::Text(Arc::from("point")));
5722
5723        let entries: Vec<_> = overlay.iter().collect();
5724        assert_eq!(
5725            entries,
5726            vec![
5727                (2, OverlayValue::Number(2.0)),
5728                (3, OverlayValue::Number(3.0)),
5729                (5, OverlayValue::Text(Arc::from("point"))),
5730            ]
5731        );
5732        assert_eq!(overlay.iter_points().count(), 1);
5733    }
5734
5735    #[test]
5736    fn overlay_fragment_estimates_follow_encoded_shapes() {
5737        let mut points = Overlay::new();
5738        for idx in 0..512 {
5739            points.set_scalar(idx, OverlayValue::Number(idx as f64));
5740        }
5741
5742        let mut dense = Overlay::new();
5743        dense.apply_fragment(
5744            OverlayFragment::dense_range(
5745                0,
5746                (0..512)
5747                    .map(|idx| OverlayValue::Number(idx as f64))
5748                    .collect::<Vec<_>>(),
5749            )
5750            .unwrap(),
5751        );
5752        assert_eq!(
5753            dense.estimated_bytes(),
5754            dense.debug_recomputed_estimated_bytes()
5755        );
5756        assert!(
5757            dense.estimated_bytes() < points.estimated_bytes(),
5758            "dense fragment should account like encoded lanes, not point-map entries"
5759        );
5760
5761        let mut short_run = Overlay::new();
5762        short_run.apply_fragment(
5763            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 8]).unwrap(),
5764        );
5765        let mut long_run = Overlay::new();
5766        long_run.apply_fragment(
5767            OverlayFragment::run_range(0, vec![OverlayValue::Number(1.0); 4096]).unwrap(),
5768        );
5769        assert_eq!(
5770            short_run.estimated_bytes(),
5771            short_run.debug_recomputed_estimated_bytes()
5772        );
5773        assert_eq!(
5774            long_run.estimated_bytes(),
5775            long_run.debug_recomputed_estimated_bytes()
5776        );
5777        assert_eq!(
5778            short_run.estimated_bytes(),
5779            long_run.estimated_bytes(),
5780            "single-run estimate should scale with run count, not covered rows"
5781        );
5782
5783        let sparse10 = OverlayFragment::sparse_offsets(
5784            (0..10)
5785                .map(|idx| (idx * 3, OverlayValue::Number(idx as f64)))
5786                .collect(),
5787        )
5788        .unwrap();
5789        let sparse20 = OverlayFragment::sparse_offsets(
5790            (0..20)
5791                .map(|idx| (idx * 3, OverlayValue::Number(idx as f64)))
5792                .collect(),
5793        )
5794        .unwrap();
5795        assert!(sparse20.estimated_bytes() > sparse10.estimated_bytes());
5796    }
5797
5798    #[test]
5799    fn overlay_estimated_bytes_stay_consistent_after_split_and_clear() {
5800        let mut overlay = Overlay::new();
5801        overlay.apply_fragment(
5802            OverlayFragment::dense_range(
5803                0,
5804                (0..16)
5805                    .map(|idx| OverlayValue::Number(idx as f64))
5806                    .collect::<Vec<_>>(),
5807            )
5808            .unwrap(),
5809        );
5810        assert_eq!(
5811            overlay.estimated_bytes(),
5812            overlay.debug_recomputed_estimated_bytes()
5813        );
5814
5815        overlay.set_scalar(8, OverlayValue::Text(Arc::from("split")));
5816        assert!(overlay.debug_is_normalized());
5817        assert_eq!(
5818            overlay.estimated_bytes(),
5819            overlay.debug_recomputed_estimated_bytes()
5820        );
5821
5822        overlay.apply_fragment(
5823            OverlayFragment::sparse_offsets(vec![
5824                (0, OverlayValue::Empty),
5825                (15, OverlayValue::Boolean(true)),
5826            ])
5827            .unwrap(),
5828        );
5829        assert!(overlay.debug_is_normalized());
5830        assert_eq!(
5831            overlay.estimated_bytes(),
5832            overlay.debug_recomputed_estimated_bytes()
5833        );
5834
5835        let freed = overlay.clear_all();
5836        assert!(freed > 0);
5837        assert_eq!(overlay.estimated_bytes(), 0);
5838        assert_eq!(overlay.debug_recomputed_estimated_bytes(), 0);
5839        assert!(overlay.is_empty());
5840    }
5841
5842    #[test]
5843    fn overlay_segment_numbers_masks_base_for_non_numeric_overlays() {
5844        let mut user = Overlay::new();
5845        user.set(1, OverlayValue::Text(Arc::from("x")));
5846        user.set(2, OverlayValue::Empty);
5847        user.set(3, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
5848        user.set(4, OverlayValue::Pending);
5849        let computed = Overlay::new();
5850        let cascade = OverlayCascade::new(&user, &computed);
5851
5852        let base = Float64Array::from(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
5853        let selected = cascade.select_numbers(0..5, &base);
5854        assert_eq!(selected.value(0), 10.0);
5855        assert!(selected.is_null(1));
5856        assert!(selected.is_null(2));
5857        assert!(selected.is_null(3));
5858        assert!(selected.is_null(4));
5859    }
5860
5861    #[test]
5862    fn overlay_segment_type_tags_preserve_temporal_tags() {
5863        let mut computed = Overlay::new();
5864        computed.set(0, OverlayValue::DateTime(45000.5));
5865        computed.set(1, OverlayValue::Duration(0.25));
5866        let user = Overlay::new();
5867        let cascade = OverlayCascade::new(&user, &computed);
5868
5869        let base = UInt8Array::from(vec![TypeTag::Empty as u8; 2]);
5870        let selected = cascade.select_type_tags(0..2, &base);
5871        assert_eq!(selected.value(0), TypeTag::DateTime as u8);
5872        assert_eq!(selected.value(1), TypeTag::Duration as u8);
5873    }
5874
5875    #[test]
5876    fn overlay_lowered_text_matches_existing_overlay_semantics() {
5877        let mut user = Overlay::new();
5878        user.set(0, OverlayValue::Text(Arc::from("HeLLo")));
5879        user.set(1, OverlayValue::Number(1.5));
5880        user.set(2, OverlayValue::Boolean(true));
5881        user.set(3, OverlayValue::Empty);
5882        let computed = Overlay::new();
5883        let cascade = OverlayCascade::new(&user, &computed);
5884
5885        let base = StringArray::from(vec![Some("A"), Some("B"), Some("C"), Some("D")]);
5886        let selected = cascade.select_lowered_text(0..4, &base);
5887        assert_eq!(selected.value(0), "hello");
5888        assert_eq!(selected.value(1), "1.5");
5889        assert_eq!(selected.value(2), "true");
5890        assert!(selected.is_null(3));
5891    }
5892
5893    fn numeric_sheet(rows: usize) -> ArrowSheet {
5894        let mut b = IngestBuilder::new("S", 1, rows.max(1), crate::engine::DateSystem::Excel1900);
5895        for row in 0..rows {
5896            b.append_row(&[LiteralValue::Number((row + 1) as f64)])
5897                .unwrap();
5898        }
5899        b.finish()
5900    }
5901
5902    fn numbers_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<Float64Array> {
5903        let view = sheet.range_view(sr, 0, er, 0);
5904        let segments: Vec<_> = view.numbers_slices().map(|res| res.unwrap()).collect();
5905        assert_eq!(segments.len(), 1);
5906        assert_eq!(segments[0].2.len(), 1);
5907        segments[0].2[0].clone()
5908    }
5909
5910    fn type_tags_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<UInt8Array> {
5911        let view = sheet.range_view(sr, 0, er, 0);
5912        let segments: Vec<_> = view.type_tags_slices().map(|res| res.unwrap()).collect();
5913        assert_eq!(segments.len(), 1);
5914        assert_eq!(segments[0].2.len(), 1);
5915        segments[0].2[0].clone()
5916    }
5917
5918    fn lowered_for_range(sheet: &ArrowSheet, sr: usize, er: usize) -> Arc<StringArray> {
5919        let view = sheet.range_view(sr, 0, er, 0);
5920        let segments: Vec<_> = view.lowered_text_slices().map(|res| res.unwrap()).collect();
5921        assert_eq!(segments.len(), 1);
5922        assert_eq!(segments[0].2.len(), 1);
5923        segments[0].2[0].clone()
5924    }
5925
5926    #[test]
5927    fn rangeview_dense_text_masks_base_numbers() {
5928        let mut sheet = numeric_sheet(4);
5929        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5930            OverlayFragment::dense_range(
5931                0,
5932                vec![
5933                    OverlayValue::Text(Arc::from("x")),
5934                    OverlayValue::Text(Arc::from("y")),
5935                    OverlayValue::Text(Arc::from("z")),
5936                    OverlayValue::Text(Arc::from("w")),
5937                ],
5938            )
5939            .unwrap(),
5940        );
5941
5942        reset_overlay_select_stats();
5943        let numbers = numbers_for_range(&sheet, 0, 3);
5944        assert_eq!(numbers.null_count(), 4);
5945        let stats = snapshot_overlay_select_stats();
5946        assert_eq!(stats.direct_dense_slices, 1);
5947        assert_eq!(stats.zip_select_calls, 0);
5948    }
5949
5950    #[test]
5951    fn rangeview_empty_dense_masks_base_all_selectors() {
5952        let mut sheet = numeric_sheet(3);
5953        sheet.columns[0].chunks[0]
5954            .computed_overlay
5955            .apply_fragment(OverlayFragment::dense_range(0, vec![OverlayValue::Empty; 3]).unwrap());
5956
5957        reset_overlay_select_stats();
5958        let numbers = numbers_for_range(&sheet, 0, 2);
5959        let type_tags = type_tags_for_range(&sheet, 0, 2);
5960        let lowered = lowered_for_range(&sheet, 0, 2);
5961        assert_eq!(numbers.null_count(), 3);
5962        assert_eq!(lowered.null_count(), 3);
5963        assert_eq!(type_tags.values(), &[TypeTag::Empty as u8; 3]);
5964        let stats = snapshot_overlay_select_stats();
5965        assert_eq!(stats.direct_dense_slices, 3);
5966        assert_eq!(stats.zip_select_calls, 0);
5967    }
5968
5969    #[test]
5970    fn rangeview_pending_masks_base_type_tag_present_lanes_null() {
5971        let mut sheet = numeric_sheet(2);
5972        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5973            OverlayFragment::dense_range(0, vec![OverlayValue::Pending; 2]).unwrap(),
5974        );
5975
5976        reset_overlay_select_stats();
5977        let numbers = numbers_for_range(&sheet, 0, 1);
5978        let type_tags = type_tags_for_range(&sheet, 0, 1);
5979        let lowered = lowered_for_range(&sheet, 0, 1);
5980        assert_eq!(numbers.null_count(), 2);
5981        assert_eq!(lowered.null_count(), 2);
5982        assert_eq!(type_tags.values(), &[TypeTag::Pending as u8; 2]);
5983        let stats = snapshot_overlay_select_stats();
5984        assert_eq!(stats.direct_dense_slices, 3);
5985        assert_eq!(stats.zip_select_calls, 0);
5986    }
5987
5988    #[test]
5989    fn rangeview_subrange_inside_dense_fragment_uses_direct_path() {
5990        let mut sheet = numeric_sheet(10);
5991        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
5992            OverlayFragment::dense_range(
5993                0,
5994                (0..10)
5995                    .map(|row| OverlayValue::Number((row + 10) as f64))
5996                    .collect(),
5997            )
5998            .unwrap(),
5999        );
6000
6001        reset_overlay_select_stats();
6002        let numbers = numbers_for_range(&sheet, 2, 6);
6003        assert_eq!(numbers.len(), 5);
6004        assert_eq!(numbers.value(0), 12.0);
6005        assert_eq!(numbers.value(4), 16.0);
6006        let stats = snapshot_overlay_select_stats();
6007        assert_eq!(stats.direct_dense_slices, 1);
6008        assert_eq!(stats.zip_select_calls, 0);
6009    }
6010
6011    #[test]
6012    fn rangeview_subrange_inside_run_fragment_uses_direct_path() {
6013        let mut sheet = numeric_sheet(10);
6014        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
6015            OverlayFragment::run_range(0, vec![OverlayValue::Number(7.0); 10]).unwrap(),
6016        );
6017
6018        reset_overlay_select_stats();
6019        let numbers = numbers_for_range(&sheet, 2, 6);
6020        assert_eq!(numbers.len(), 5);
6021        for idx in 0..numbers.len() {
6022            assert_eq!(numbers.value(idx), 7.0);
6023        }
6024        let stats = snapshot_overlay_select_stats();
6025        assert_eq!(stats.direct_run_materializations, 1);
6026        assert_eq!(stats.zip_select_calls, 0);
6027    }
6028
6029    #[test]
6030    fn rangeview_user_partial_wrong_type_masks_computed_numeric() {
6031        let mut sheet = numeric_sheet(5);
6032        let chunk = &mut sheet.columns[0].chunks[0];
6033        chunk.computed_overlay.apply_fragment(
6034            OverlayFragment::dense_range(
6035                0,
6036                (0..5)
6037                    .map(|row| OverlayValue::Number((row + 10) as f64))
6038                    .collect(),
6039            )
6040            .unwrap(),
6041        );
6042        chunk.overlay.apply_fragment(
6043            OverlayFragment::dense_range(2, vec![OverlayValue::Text(Arc::from("mask"))]).unwrap(),
6044        );
6045
6046        reset_overlay_select_stats();
6047        let numbers = numbers_for_range(&sheet, 0, 4);
6048        assert_eq!(numbers.value(0), 10.0);
6049        assert_eq!(numbers.value(1), 11.0);
6050        assert!(numbers.is_null(2));
6051        assert_eq!(numbers.value(3), 13.0);
6052        assert_eq!(numbers.value(4), 14.0);
6053        let stats = snapshot_overlay_select_stats();
6054        assert_eq!(stats.direct_dense_slices, 0);
6055        assert_eq!(stats.zip_select_calls, 1);
6056        assert_eq!(stats.partial_dense_intersections, 2);
6057    }
6058
6059    #[test]
6060    fn rangeview_computed_full_cover_user_no_overlap_uses_computed_direct() {
6061        let mut sheet = numeric_sheet(5);
6062        let chunk = &mut sheet.columns[0].chunks[0];
6063        chunk.computed_overlay.apply_fragment(
6064            OverlayFragment::dense_range(0, vec![OverlayValue::Number(3.0); 5]).unwrap(),
6065        );
6066        chunk
6067            .overlay
6068            .set_scalar(10, OverlayValue::Text(Arc::from("outside")));
6069
6070        reset_overlay_select_stats();
6071        let numbers = numbers_for_range(&sheet, 0, 4);
6072        assert_eq!(numbers.value(0), 3.0);
6073        assert_eq!(numbers.value(4), 3.0);
6074        let stats = snapshot_overlay_select_stats();
6075        assert_eq!(stats.direct_dense_slices, 1);
6076        assert_eq!(stats.zip_select_calls, 0);
6077    }
6078
6079    #[test]
6080    fn rangeview_user_full_cover_ignores_computed() {
6081        let mut sheet = numeric_sheet(4);
6082        let chunk = &mut sheet.columns[0].chunks[0];
6083        chunk.computed_overlay.apply_fragment(
6084            OverlayFragment::dense_range(0, vec![OverlayValue::Number(99.0); 4]).unwrap(),
6085        );
6086        chunk.overlay.apply_fragment(
6087            OverlayFragment::dense_range(0, vec![OverlayValue::Text(Arc::from("user")); 4])
6088                .unwrap(),
6089        );
6090
6091        reset_overlay_select_stats();
6092        let numbers = numbers_for_range(&sheet, 0, 3);
6093        assert_eq!(numbers.null_count(), 4);
6094        let stats = snapshot_overlay_select_stats();
6095        assert_eq!(stats.direct_dense_slices, 1);
6096        assert_eq!(stats.zip_select_calls, 0);
6097    }
6098
6099    #[test]
6100    fn rangeview_point_overlay_still_matches_legacy_scalar_path() {
6101        let mut sheet = numeric_sheet(3);
6102        sheet.columns[0].chunks[0]
6103            .computed_overlay
6104            .set_scalar(1, OverlayValue::Text(Arc::from("point")));
6105
6106        reset_overlay_select_stats();
6107        let numbers = numbers_for_range(&sheet, 0, 2);
6108        assert_eq!(numbers.value(0), 1.0);
6109        assert!(numbers.is_null(1));
6110        assert_eq!(numbers.value(2), 3.0);
6111        let stats = snapshot_overlay_select_stats();
6112        assert_eq!(stats.zip_select_calls, 1);
6113        assert_eq!(stats.point_entries_applied, 1);
6114        assert_eq!(stats.row_scalar_fallbacks, 0);
6115    }
6116
6117    #[test]
6118    fn rangeview_multi_fragment_full_union_does_not_use_direct_path() {
6119        let mut sheet = numeric_sheet(4);
6120        let chunk = &mut sheet.columns[0].chunks[0];
6121        chunk.computed_overlay.apply_fragment(
6122            OverlayFragment::dense_range(0, vec![OverlayValue::Number(10.0); 2]).unwrap(),
6123        );
6124        chunk.computed_overlay.apply_fragment(
6125            OverlayFragment::dense_range(2, vec![OverlayValue::Number(20.0); 2]).unwrap(),
6126        );
6127
6128        reset_overlay_select_stats();
6129        let numbers = numbers_for_range(&sheet, 0, 3);
6130        assert_eq!(numbers.value(0), 10.0);
6131        assert_eq!(numbers.value(1), 10.0);
6132        assert_eq!(numbers.value(2), 20.0);
6133        assert_eq!(numbers.value(3), 20.0);
6134        let stats = snapshot_overlay_select_stats();
6135        assert_eq!(stats.direct_dense_slices, 0);
6136        assert_eq!(stats.zip_select_calls, 1);
6137        assert_eq!(stats.partial_dense_intersections, 2);
6138    }
6139
6140    #[test]
6141    fn rangeview_lowered_text_fragment_semantics_match_scalar_semantics() {
6142        let mut sheet = numeric_sheet(8);
6143        sheet.columns[0].chunks[0].computed_overlay.apply_fragment(
6144            OverlayFragment::dense_range(
6145                0,
6146                vec![
6147                    OverlayValue::Text(Arc::from("HeLLo")),
6148                    OverlayValue::Number(1.5),
6149                    OverlayValue::DateTime(45000.25),
6150                    OverlayValue::Duration(0.5),
6151                    OverlayValue::Boolean(true),
6152                    OverlayValue::Empty,
6153                    OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
6154                    OverlayValue::Pending,
6155                ],
6156            )
6157            .unwrap(),
6158        );
6159
6160        reset_overlay_select_stats();
6161        let lowered = lowered_for_range(&sheet, 0, 7);
6162        assert_eq!(lowered.value(0), "hello");
6163        assert_eq!(lowered.value(1), "1.5");
6164        assert_eq!(lowered.value(2), "45000.25");
6165        assert_eq!(lowered.value(3), "0.5");
6166        assert_eq!(lowered.value(4), "true");
6167        assert!(lowered.is_null(5));
6168        assert!(lowered.is_null(6));
6169        assert!(lowered.is_null(7));
6170        let stats = snapshot_overlay_select_stats();
6171        assert_eq!(stats.direct_dense_slices, 1);
6172        assert_eq!(stats.zip_select_calls, 0);
6173    }
6174
6175    #[test]
6176    fn row_chunk_slices_shape() {
6177        // chunk_rows=2 leads to two slices for 3 rows
6178        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6179        b.append_row(&[LiteralValue::Text("a".into()), LiteralValue::Number(1.0)])
6180            .unwrap();
6181        b.append_row(&[LiteralValue::Text("b".into()), LiteralValue::Number(2.0)])
6182            .unwrap();
6183        b.append_row(&[LiteralValue::Text("c".into()), LiteralValue::Number(3.0)])
6184            .unwrap();
6185        let sheet = b.finish();
6186        let rv = sheet.range_view(0, 0, 2, 1);
6187        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
6188        assert_eq!(slices.len(), 2);
6189        assert_eq!(slices[0].row_start, 0);
6190        assert_eq!(slices[0].row_len, 2);
6191        assert_eq!(slices[0].cols.len(), 2);
6192        assert_eq!(slices[1].row_start, 2);
6193        assert_eq!(slices[1].row_len, 1);
6194        assert_eq!(slices[1].cols.len(), 2);
6195    }
6196
6197    #[test]
6198    fn oob_columns_are_padded() {
6199        // Build with 2 columns; request 3 columns (ec beyond last col)
6200        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6201        b.append_row(&[LiteralValue::Number(1.0), LiteralValue::Text("a".into())])
6202            .unwrap();
6203        b.append_row(&[LiteralValue::Number(2.0), LiteralValue::Text("b".into())])
6204            .unwrap();
6205        let sheet = b.finish();
6206        // Request cols [0..=2] → 3 columns with padding
6207        let rv = sheet.range_view(0, 0, 1, 2);
6208        assert_eq!(rv.dims(), (2, 3));
6209        let slices: Vec<_> = rv.iter_row_chunks().map(|r| r.unwrap()).collect();
6210        assert!(!slices.is_empty());
6211        for cs in &slices {
6212            assert_eq!(cs.cols.len(), 3);
6213        }
6214        // Also validate typed slices return 3 entries per segment
6215        for res in rv.numbers_slices() {
6216            let (_rs, _rl, cols) = res.unwrap();
6217            assert_eq!(cols.len(), 3);
6218        }
6219        for res in rv.booleans_slices() {
6220            let (_rs, _rl, cols) = res.unwrap();
6221            assert_eq!(cols.len(), 3);
6222        }
6223        for res in rv.text_slices() {
6224            let (_rs, _rl, cols) = res.unwrap();
6225            assert_eq!(cols.len(), 3);
6226        }
6227        for res in rv.errors_slices() {
6228            let (_rs, _rl, cols) = res.unwrap();
6229            assert_eq!(cols.len(), 3);
6230        }
6231        for res in rv.lowered_text_slices() {
6232            let (_rs, _rl, cols) = res.unwrap();
6233            assert_eq!(cols.len(), 3);
6234        }
6235    }
6236
6237    #[test]
6238    fn reversed_range_is_empty() {
6239        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6240        b.append_row(&[LiteralValue::Number(1.0)]).unwrap();
6241        b.append_row(&[LiteralValue::Number(2.0)]).unwrap();
6242        let sheet = b.finish();
6243        let rv = sheet.range_view(3, 0, 1, 0); // er < sr
6244        assert_eq!(rv.dims(), (0, 0));
6245        assert!(rv.iter_row_chunks().next().is_none());
6246        assert_eq!(rv.get_cell(0, 0), LiteralValue::Empty);
6247    }
6248
6249    #[test]
6250    fn chunk_alignment_invariant() {
6251        let mut b = IngestBuilder::new("S", 3, 2, crate::engine::DateSystem::Excel1900);
6252        // 5 rows, 2-row chunks => 3 chunks (2,2,1)
6253        for r in 0..5 {
6254            b.append_row(&[
6255                LiteralValue::Number(r as f64),
6256                LiteralValue::Text(format!("{r}")),
6257                if r % 2 == 0 {
6258                    LiteralValue::Empty
6259                } else {
6260                    LiteralValue::Boolean(true)
6261                },
6262            ])
6263            .unwrap();
6264        }
6265        let sheet = b.finish();
6266        // chunk_starts should be [0,2,4]
6267        assert_eq!(sheet.chunk_starts, vec![0, 2, 4]);
6268        // All columns must share per-chunk lengths equal to [2,2,1]
6269        let lens0: Vec<usize> = sheet.columns[0]
6270            .chunks
6271            .iter()
6272            .map(|ch| ch.type_tag.len())
6273            .collect();
6274        for col in &sheet.columns[1..] {
6275            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6276            assert_eq!(lens, lens0);
6277        }
6278    }
6279
6280    #[test]
6281    fn chunking_splits_rows() {
6282        // Two columns, chunk size 2 → expect two chunks
6283        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6284        let rows = vec![
6285            vec![LiteralValue::Number(1.0), LiteralValue::Text("a".into())],
6286            vec![LiteralValue::Empty, LiteralValue::Text("b".into())],
6287            vec![LiteralValue::Boolean(true), LiteralValue::Empty],
6288        ];
6289        for r in rows {
6290            b.append_row(&r).unwrap();
6291        }
6292        let sheet = b.finish();
6293        assert_eq!(sheet.columns[0].chunks.len(), 2);
6294        assert_eq!(sheet.columns[1].chunks.len(), 2);
6295        assert_eq!(sheet.columns[0].chunks[0].numbers_or_null().len(), 2);
6296        assert_eq!(sheet.columns[0].chunks[1].numbers_or_null().len(), 1);
6297    }
6298
6299    #[test]
6300    fn pending_is_not_error() {
6301        let mut b = IngestBuilder::new("S", 1, 8, crate::engine::DateSystem::Excel1900);
6302        b.append_row(&[LiteralValue::Pending]).unwrap();
6303        let sheet = b.finish();
6304        let ch = &sheet.columns[0].chunks[0];
6305        // tag is Pending
6306        assert_eq!(ch.type_tag.values()[0], super::TypeTag::Pending as u8);
6307        // errors lane is effectively null
6308        let errs = ch.errors_or_null();
6309        assert_eq!(errs.null_count(), 1);
6310    }
6311
6312    #[test]
6313    fn all_null_numeric_lane_uses_null_array() {
6314        // Only text values in first column → numbers lane should be all null with correct dtype
6315        let mut b = IngestBuilder::new("S", 1, 16, crate::engine::DateSystem::Excel1900);
6316        b.append_row(&[LiteralValue::Text("a".into())]).unwrap();
6317        b.append_row(&[LiteralValue::Text("".into())]).unwrap();
6318        b.append_row(&[LiteralValue::Text("b".into())]).unwrap();
6319        let sheet = b.finish();
6320        let ch = &sheet.columns[0].chunks[0];
6321        let nums = ch.numbers_or_null();
6322        assert_eq!(nums.len(), 3);
6323        assert_eq!(nums.null_count(), 3);
6324        assert_eq!(nums.data_type(), &DataType::Float64);
6325    }
6326
6327    #[test]
6328    fn row_insert_delete_across_chunk_boundaries_with_overlays() {
6329        // Build 1 column, chunk size 4, 10 rows -> chunks at [0..4],[4..8],[8..10]
6330        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6331        for _ in 0..10 {
6332            b.append_row(&[LiteralValue::Empty]).unwrap();
6333        }
6334        let mut sheet = b.finish();
6335        // Add overlays at row 3 and row 4
6336        {
6337            let (c0, o0) = sheet.chunk_of_row(3).unwrap();
6338            sheet.columns[0].chunks[c0]
6339                .overlay
6340                .set(o0, OverlayValue::Number(30.0));
6341            let (c1, o1) = sheet.chunk_of_row(4).unwrap();
6342            sheet.columns[0].chunks[c1]
6343                .overlay
6344                .set(o1, OverlayValue::Number(40.0));
6345        }
6346        // Insert 2 rows before row 4 (at chunk boundary)
6347        sheet.insert_rows(4, 2);
6348        assert_eq!(sheet.nrows, 12);
6349        // Validate overlays moved correctly: 3 stays, 4 becomes Empty, 6 has 40
6350        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6351        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
6352        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
6353        assert_eq!(av.get_cell(6, 0), LiteralValue::Number(40.0));
6354
6355        // Now delete 3 rows starting at 3: removes rows 3,4,5 → moves 40.0 from 6 → 3
6356        sheet.delete_rows(3, 3);
6357        assert_eq!(sheet.nrows, 9);
6358        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6359        assert_eq!(av2.get_cell(3, 0), LiteralValue::Number(40.0));
6360        // All columns share chunk lengths; chunk_starts monotonic and cover nrows
6361        let lens0: Vec<usize> = sheet.columns[0]
6362            .chunks
6363            .iter()
6364            .map(|ch| ch.type_tag.len())
6365            .collect();
6366        for col in &sheet.columns {
6367            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6368            assert_eq!(lens, lens0);
6369        }
6370        // chunk_starts should be monotonic and final chunk end == nrows
6371        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
6372        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
6373        let last_len = sheet.columns[0]
6374            .chunks
6375            .last()
6376            .map(|c| c.type_tag.len())
6377            .unwrap_or(0);
6378        assert_eq!(last_start + last_len, sheet.nrows as usize);
6379    }
6380
6381    #[test]
6382    fn row_insert_delete_preserves_user_dense_fragments() {
6383        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6384        for _ in 0..10 {
6385            b.append_row(&[LiteralValue::Empty]).unwrap();
6386        }
6387        let mut sheet = b.finish();
6388
6389        let (ch_idx, off) = sheet.chunk_of_row(1).unwrap();
6390        sheet.columns[0]
6391            .chunk_mut(ch_idx)
6392            .unwrap()
6393            .overlay
6394            .apply_fragment(
6395                OverlayFragment::dense_range(
6396                    off,
6397                    vec![
6398                        OverlayValue::Number(10.0),
6399                        OverlayValue::Number(20.0),
6400                        OverlayValue::Number(30.0),
6401                    ],
6402                )
6403                .unwrap(),
6404            );
6405
6406        let before = column_overlay_stats(&sheet, 0, false);
6407        assert_eq!(before.dense_fragments, 1);
6408        assert_eq!(before.sparse_fragments, 0);
6409        assert_column_overlays_normalized(&sheet, 0);
6410
6411        sheet.insert_rows(2, 2);
6412        assert_eq!(sheet.nrows, 12);
6413        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6414        assert_eq!(av.get_cell(1, 0), LiteralValue::Number(10.0));
6415        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6416        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6417        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(20.0));
6418        assert_eq!(av.get_cell(5, 0), LiteralValue::Number(30.0));
6419        let after_insert = column_overlay_stats(&sheet, 0, false);
6420        assert_eq!(after_insert.sparse_fragments, 0);
6421        assert!(after_insert.dense_fragments >= 2);
6422        assert_column_overlays_normalized(&sheet, 0);
6423
6424        sheet.delete_rows(2, 2);
6425        assert_eq!(sheet.nrows, 10);
6426        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6427        assert_eq!(av.get_cell(1, 0), LiteralValue::Number(10.0));
6428        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(20.0));
6429        assert_eq!(av.get_cell(3, 0), LiteralValue::Number(30.0));
6430        let after_delete = column_overlay_stats(&sheet, 0, false);
6431        assert_eq!(after_delete.sparse_fragments, 0);
6432        assert!(after_delete.dense_fragments >= 1);
6433        assert_column_overlays_normalized(&sheet, 0);
6434    }
6435
6436    #[test]
6437    fn row_insert_delete_preserves_computed_empty_run_fragments() {
6438        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6439        for row in 0..8 {
6440            b.append_row(&[LiteralValue::Number((row + 1) as f64)])
6441                .unwrap();
6442        }
6443        let mut sheet = b.finish();
6444
6445        let (ch_idx, off) = sheet.chunk_of_row(1).unwrap();
6446        sheet.columns[0]
6447            .chunk_mut(ch_idx)
6448            .unwrap()
6449            .computed_overlay
6450            .apply_fragment(
6451                OverlayFragment::run_range(
6452                    off,
6453                    vec![
6454                        OverlayValue::Empty,
6455                        OverlayValue::Empty,
6456                        OverlayValue::Empty,
6457                    ],
6458                )
6459                .unwrap(),
6460            );
6461
6462        let before = column_overlay_stats(&sheet, 0, true);
6463        assert_eq!(before.run_fragments, 1);
6464        assert_eq!(before.sparse_fragments, 0);
6465        assert_column_overlays_normalized(&sheet, 0);
6466
6467        sheet.insert_rows(2, 1);
6468        assert_eq!(sheet.nrows, 9);
6469        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6470        assert_eq!(av.get_cell(1, 0), LiteralValue::Empty);
6471        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6472        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6473        assert_eq!(av.get_cell(4, 0), LiteralValue::Empty);
6474        assert_eq!(av.get_cell(5, 0), LiteralValue::Number(5.0));
6475        let after_insert = column_overlay_stats(&sheet, 0, true);
6476        assert_eq!(after_insert.sparse_fragments, 0);
6477        assert!(after_insert.run_fragments >= 2);
6478        assert_column_overlays_normalized(&sheet, 0);
6479
6480        sheet.delete_rows(2, 1);
6481        assert_eq!(sheet.nrows, 8);
6482        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6483        assert_eq!(av.get_cell(1, 0), LiteralValue::Empty);
6484        assert_eq!(av.get_cell(2, 0), LiteralValue::Empty);
6485        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6486        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(5.0));
6487        let after_delete = column_overlay_stats(&sheet, 0, true);
6488        assert_eq!(after_delete.sparse_fragments, 0);
6489        assert!(after_delete.run_fragments >= 1);
6490        assert_column_overlays_normalized(&sheet, 0);
6491    }
6492
6493    #[test]
6494    fn column_insert_delete_retains_chunk_alignment() {
6495        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
6496        for _ in 0..5 {
6497            b.append_row(&[
6498                LiteralValue::Empty,
6499                LiteralValue::Empty,
6500                LiteralValue::Empty,
6501            ])
6502            .unwrap();
6503        }
6504        let mut sheet = b.finish();
6505        // Record reference chunk lengths of first column
6506        let ref_lens: Vec<usize> = sheet.columns[0]
6507            .chunks
6508            .iter()
6509            .map(|ch| ch.type_tag.len())
6510            .collect();
6511        // Insert 2 columns before index 1
6512        sheet.insert_columns(1, 2);
6513        assert_eq!(sheet.columns.len(), 5);
6514        for col in &sheet.columns {
6515            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6516            assert_eq!(lens, ref_lens);
6517        }
6518        let starts_before = sheet.chunk_starts.clone();
6519        // Delete 2 columns starting at index 2 → back to 3 columns
6520        sheet.delete_columns(2, 2);
6521        assert_eq!(sheet.columns.len(), 3);
6522        for col in &sheet.columns {
6523            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6524            assert_eq!(lens, ref_lens);
6525        }
6526        // chunk_starts unchanged by column operations
6527        assert_eq!(sheet.chunk_starts, starts_before);
6528    }
6529
6530    #[test]
6531    fn multiple_adjacent_row_ops_overlay_mixed_types() {
6532        use formualizer_common::ExcelErrorKind;
6533        // Two columns to ensure alignment preserved across columns
6534        let mut b = IngestBuilder::new("S", 2, 3, crate::engine::DateSystem::Excel1900);
6535        for _ in 0..9 {
6536            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
6537                .unwrap();
6538        }
6539        let mut sheet = b.finish();
6540        // Overlays at rows (0-based): 2->Number, 3->Text, 5->Boolean, 6->Error, 8->Empty
6541        // Column 0 only
6542        let set_ov = |sh: &mut ArrowSheet, row: usize, ov: OverlayValue| {
6543            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
6544            let _ = sh.columns[0].chunks[ch_i].overlay.set(off, ov);
6545        };
6546        set_ov(&mut sheet, 2, OverlayValue::Number(12.5));
6547        set_ov(&mut sheet, 3, OverlayValue::Text(Arc::from("hello")));
6548        set_ov(&mut sheet, 5, OverlayValue::Boolean(true));
6549        set_ov(
6550            &mut sheet,
6551            6,
6552            OverlayValue::Error(map_error_code(ExcelErrorKind::Div)),
6553        );
6554        set_ov(&mut sheet, 8, OverlayValue::Empty);
6555
6556        // Insert 1 row before index 3
6557        sheet.insert_rows(3, 1);
6558        // Expected new positions: 2->2 (unchanged), 3->4, 5->6, 6->7, 8->9
6559        let av1 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6560        assert_eq!(av1.get_cell(2, 0), LiteralValue::Number(12.5));
6561        assert_eq!(av1.get_cell(4, 0), LiteralValue::Text("hello".into()));
6562        assert_eq!(av1.get_cell(6, 0), LiteralValue::Boolean(true));
6563        match av1.get_cell(7, 0) {
6564            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6565            other => panic!("expected error at row 7, got {other:?}"),
6566        }
6567        assert_eq!(av1.get_cell(9, 0), LiteralValue::Empty);
6568
6569        // Insert 2 rows before index 4 (adjacent to previous region)
6570        sheet.insert_rows(4, 2);
6571        // Now positions: 2->2, 4->6, 6->8, 7->9, 9->11
6572        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6573        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.5));
6574        assert_eq!(av2.get_cell(6, 0), LiteralValue::Text("hello".into()));
6575        assert_eq!(av2.get_cell(8, 0), LiteralValue::Boolean(true));
6576        match av2.get_cell(9, 0) {
6577            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6578            other => panic!("expected error at row 9, got {other:?}"),
6579        }
6580        assert_eq!(av2.get_cell(11, 0), LiteralValue::Empty);
6581
6582        // Delete 2 rows starting at index 6 → removes the text at 6 and one empty row
6583        sheet.delete_rows(6, 2);
6584        let av3 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 0);
6585        // Remaining expected: 2->Number 12.5, 6 (was 8)->true, 7 (was 9)->#DIV/0!, 9 (was 11)->Empty
6586        assert_eq!(av3.get_cell(2, 0), LiteralValue::Number(12.5));
6587        assert_eq!(av3.get_cell(6, 0), LiteralValue::Boolean(true));
6588        match av3.get_cell(7, 0) {
6589            LiteralValue::Error(e) => assert_eq!(e.kind, ExcelErrorKind::Div),
6590            other => panic!("expected error at row 8, got {other:?}"),
6591        }
6592        assert_eq!(av3.get_cell(9, 0), LiteralValue::Empty);
6593
6594        // Alignment checks
6595        let lens0: Vec<usize> = sheet.columns[0]
6596            .chunks
6597            .iter()
6598            .map(|ch| ch.type_tag.len())
6599            .collect();
6600        for col in &sheet.columns {
6601            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6602            assert_eq!(lens, lens0);
6603        }
6604        // chunk_starts monotonically increasing and cover nrows
6605        assert!(sheet.chunk_starts.windows(2).all(|w| w[0] < w[1]));
6606        let last_start = *sheet.chunk_starts.last().unwrap_or(&0);
6607        let last_len = sheet.columns[0]
6608            .chunks
6609            .last()
6610            .map(|c| c.type_tag.len())
6611            .unwrap_or(0);
6612        assert_eq!(last_start + last_len, sheet.nrows as usize);
6613    }
6614
6615    #[test]
6616    fn multiple_adjacent_column_ops_alignment() {
6617        // Start with 2 columns, chunk_rows=2, rows=5
6618        let mut b = IngestBuilder::new("S", 2, 2, crate::engine::DateSystem::Excel1900);
6619        for _ in 0..5 {
6620            b.append_row(&[LiteralValue::Empty, LiteralValue::Empty])
6621                .unwrap();
6622        }
6623        let mut sheet = b.finish();
6624        let ref_lens: Vec<usize> = sheet.columns[0]
6625            .chunks
6626            .iter()
6627            .map(|ch| ch.type_tag.len())
6628            .collect();
6629        // Insert 1 at start, then 2 at index 2 → columns = 5
6630        sheet.insert_columns(0, 1);
6631        sheet.insert_columns(2, 2);
6632        assert_eq!(sheet.columns.len(), 5);
6633        for col in &sheet.columns {
6634            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6635            assert_eq!(lens, ref_lens);
6636        }
6637        let starts_before = sheet.chunk_starts.clone();
6638        // Delete 1 at index 1, then 2 at the end if available
6639        sheet.delete_columns(1, 1);
6640        let remain = sheet.columns.len();
6641        if remain >= 3 {
6642            sheet.delete_columns(remain - 2, 2);
6643        }
6644        for col in &sheet.columns {
6645            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6646            assert_eq!(lens, ref_lens);
6647        }
6648        assert_eq!(sheet.chunk_starts, starts_before);
6649    }
6650
6651    #[test]
6652    fn overlays_on_multiple_columns_row_col_ops() {
6653        // 3 columns, chunk_rows=3, rows=6 → chunks [0..3), [3..6)
6654        let mut b = IngestBuilder::new("S", 3, 3, crate::engine::DateSystem::Excel1900);
6655        for _ in 0..6 {
6656            b.append_row(&[
6657                LiteralValue::Empty,
6658                LiteralValue::Empty,
6659                LiteralValue::Empty,
6660            ])
6661            .unwrap();
6662        }
6663        let mut sheet = b.finish();
6664        // Overlays at row2 and row3 across columns with different types
6665        let set_ov = |sh: &mut ArrowSheet, col: usize, row: usize, ov: OverlayValue| {
6666            let (ch_i, off) = sh.chunk_of_row(row).unwrap();
6667            let _ = sh.columns[col].chunks[ch_i].overlay.set(off, ov);
6668        };
6669        set_ov(&mut sheet, 0, 2, OverlayValue::Number(12.0));
6670        set_ov(&mut sheet, 1, 2, OverlayValue::Text(Arc::from("xx")));
6671        set_ov(&mut sheet, 2, 2, OverlayValue::Boolean(true));
6672        set_ov(&mut sheet, 0, 3, OverlayValue::Number(33.0));
6673        set_ov(&mut sheet, 1, 3, OverlayValue::Text(Arc::from("yy")));
6674        set_ov(&mut sheet, 2, 3, OverlayValue::Boolean(false));
6675
6676        // Insert a row at boundary (before row index 3)
6677        sheet.insert_rows(3, 1);
6678        // Now original row>=3 shift down by 1
6679        let av = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 2);
6680        // Row 2 values unchanged
6681        assert_eq!(av.get_cell(2, 0), LiteralValue::Number(12.0));
6682        assert_eq!(av.get_cell(2, 1), LiteralValue::Text("xx".into()));
6683        assert_eq!(av.get_cell(2, 2), LiteralValue::Boolean(true));
6684        // Row 3 became Empty (inserted)
6685        assert_eq!(av.get_cell(3, 0), LiteralValue::Empty);
6686        // Row 4 holds old row 3 overlays
6687        assert_eq!(av.get_cell(4, 0), LiteralValue::Number(33.0));
6688        assert_eq!(av.get_cell(4, 1), LiteralValue::Text("yy".into()));
6689        assert_eq!(av.get_cell(4, 2), LiteralValue::Boolean(false));
6690
6691        // Delete column 1 (middle), values shift left
6692        sheet.delete_columns(1, 1);
6693        let av2 = sheet.range_view(0, 0, (sheet.nrows - 1) as usize, 1);
6694        assert_eq!(av2.get_cell(2, 0), LiteralValue::Number(12.0));
6695        // Column 1 now was old column 2
6696        assert_eq!(av2.get_cell(2, 1), LiteralValue::Boolean(true));
6697        assert_eq!(av2.get_cell(4, 0), LiteralValue::Number(33.0));
6698        assert_eq!(av2.get_cell(4, 1), LiteralValue::Boolean(false));
6699
6700        // Alignment preserved
6701        let lens0: Vec<usize> = sheet.columns[0]
6702            .chunks
6703            .iter()
6704            .map(|ch| ch.type_tag.len())
6705            .collect();
6706        for col in &sheet.columns {
6707            let lens: Vec<usize> = col.chunks.iter().map(|ch| ch.type_tag.len()).collect();
6708            assert_eq!(lens, lens0);
6709        }
6710    }
6711
6712    #[test]
6713    fn effective_slices_overlay_precedence_numbers_text() {
6714        // 1 column, chunk_rows=3, rows=6. Base numbers in lane; overlays include text on row1 and number on row4.
6715        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
6716        for i in 0..6 {
6717            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
6718                .unwrap();
6719        }
6720        let mut sheet = b.finish();
6721        // Overlays: row1 -> Text("X"), row4 -> Number(99)
6722        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
6723        sheet.columns[0].chunks[c1]
6724            .overlay
6725            .set(o1, OverlayValue::Text(Arc::from("X")));
6726        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
6727        sheet.columns[0].chunks[c4]
6728            .overlay
6729            .set(o4, OverlayValue::Number(99.0));
6730
6731        let av = sheet.range_view(0, 0, 5, 0);
6732        // Validate numbers_slices: row1 should be null (text overlay), row4 should be 99.0, others base
6733        let mut numeric: Vec<Option<f64>> = vec![None; 6];
6734        for res in av.numbers_slices() {
6735            let (row_start, row_len, cols) = res.unwrap();
6736            let a = &cols[0];
6737            for i in 0..row_len {
6738                let idx = row_start + i;
6739                numeric[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6740            }
6741        }
6742        assert_eq!(numeric[0], Some(1.0));
6743        assert_eq!(numeric[1], None); // overshadowed by text overlay
6744        assert_eq!(numeric[2], Some(3.0));
6745        assert_eq!(numeric[3], Some(4.0));
6746        assert_eq!(numeric[4], Some(99.0));
6747        assert_eq!(numeric[5], Some(6.0));
6748
6749        // Validate text_slices: row1 has "X", others null
6750        let mut texts: Vec<Option<String>> = vec![None; 6];
6751        for res in av.text_slices() {
6752            let (row_start, row_len, cols) = res.unwrap();
6753            let a = cols[0].as_any().downcast_ref::<StringArray>().unwrap();
6754            for i in 0..row_len {
6755                let idx = row_start + i;
6756                texts[idx] = if a.is_null(i) {
6757                    None
6758                } else {
6759                    Some(a.value(i).to_string())
6760                };
6761            }
6762        }
6763        assert_eq!(texts[1].as_deref(), Some("X"));
6764        assert!(texts[0].is_none());
6765        assert!(texts[2].is_none());
6766        assert!(texts[3].is_none());
6767        assert!(texts[4].is_none());
6768        assert!(texts[5].is_none());
6769    }
6770
6771    #[test]
6772    fn effective_slices_overlay_precedence_booleans() {
6773        // Base booleans over 1 column; overlays include boolean and non-boolean types.
6774        let mut b = IngestBuilder::new("S", 1, 4, crate::engine::DateSystem::Excel1900);
6775        for i in 0..6 {
6776            let v = if i % 2 == 0 {
6777                LiteralValue::Boolean(true)
6778            } else {
6779                LiteralValue::Boolean(false)
6780            };
6781            b.append_row(&[v]).unwrap();
6782        }
6783        let mut sheet = b.finish();
6784        // Overlays: row1 -> Boolean(true), row2 -> Text("T")
6785        let (c1, o1) = sheet.chunk_of_row(1).unwrap();
6786        sheet.columns[0].chunks[c1]
6787            .overlay
6788            .set(o1, OverlayValue::Boolean(true));
6789        let (c2, o2) = sheet.chunk_of_row(2).unwrap();
6790        sheet.columns[0].chunks[c2]
6791            .overlay
6792            .set(o2, OverlayValue::Text(Arc::from("T")));
6793
6794        let av = sheet.range_view(0, 0, 5, 0);
6795        // Validate booleans_slices: row1 should be true (overlay), row2 should be null (text overlay), others base
6796        let mut bools: Vec<Option<bool>> = vec![None; 6];
6797        for res in av.booleans_slices() {
6798            let (row_start, row_len, cols) = res.unwrap();
6799            let a = &cols[0];
6800            for i in 0..row_len {
6801                let idx = row_start + i;
6802                bools[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6803            }
6804        }
6805        assert_eq!(bools[0], Some(true));
6806        assert_eq!(bools[1], Some(true)); // overlay to true
6807        assert_eq!(bools[2], None); // overshadowed by text overlay
6808        // spot-check others remain base
6809        assert_eq!(bools[3], Some(false));
6810    }
6811
6812    #[test]
6813    fn effective_slices_overlay_precedence_errors() {
6814        // Base numbers; overlay an error at one row and ensure errors_slices reflect it.
6815        let mut b = IngestBuilder::new("S", 1, 3, crate::engine::DateSystem::Excel1900);
6816        for i in 0..6 {
6817            b.append_row(&[LiteralValue::Number((i + 1) as f64)])
6818                .unwrap();
6819        }
6820        let mut sheet = b.finish();
6821        // Overlay error at row 4
6822        let (c4, o4) = sheet.chunk_of_row(4).unwrap();
6823        sheet.columns[0].chunks[c4]
6824            .overlay
6825            .set(o4, OverlayValue::Error(map_error_code(ExcelErrorKind::Div)));
6826
6827        let av = sheet.range_view(0, 0, 5, 0);
6828        let mut errs: Vec<Option<u8>> = vec![None; 6];
6829        for res in av.errors_slices() {
6830            let (row_start, row_len, cols) = res.unwrap();
6831            let a = &cols[0];
6832            for i in 0..row_len {
6833                let idx = row_start + i;
6834                errs[idx] = if a.is_null(i) { None } else { Some(a.value(i)) };
6835            }
6836        }
6837        assert_eq!(errs[4], Some(map_error_code(ExcelErrorKind::Div)));
6838        assert!(errs[3].is_none());
6839    }
6840}