formualizer_eval/engine/
eval.rs

1use crate::SheetId;
2use crate::arrow_store::SheetStore;
3use crate::engine::named_range::{NameScope, NamedDefinition};
4use crate::engine::range_view::RangeView;
5use crate::engine::spill::{RegionLockManager, SpillMeta, SpillShape};
6use crate::engine::{DependencyGraph, EvalConfig, Scheduler, VertexId, VertexKind};
7use crate::interpreter::Interpreter;
8use crate::reference::{CellRef, Coord, RangeRef};
9use crate::traits::FunctionProvider;
10use crate::traits::{EvaluationContext, Resolver};
11use chrono::Timelike;
12use formualizer_common::{col_letters_from_1based, parse_a1_1based};
13use formualizer_parse::parser::ReferenceType;
14use formualizer_parse::{ASTNode, ExcelError, ExcelErrorKind, LiteralValue};
15use rayon::ThreadPoolBuilder;
16use rustc_hash::FxHashSet;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20pub struct Engine<R> {
21    pub graph: DependencyGraph,
22    resolver: R,
23    pub config: EvalConfig,
24    thread_pool: Option<Arc<rayon::ThreadPool>>,
25    pub recalc_epoch: u64,
26    snapshot_id: std::sync::atomic::AtomicU64,
27    spill_mgr: ShimSpillManager,
28    /// Arrow-backed storage for sheet values (Phase A)
29    arrow_sheets: SheetStore,
30    /// True if any edit after bulk load; disables Arrow reads for parity
31    has_edited: bool,
32    /// Overlay compaction counter (Phase C instrumentation)
33    overlay_compactions: u64,
34    // Pass-scoped cache for Arrow used-row bounds per column
35    row_bounds_cache: std::sync::RwLock<Option<RowBoundsCache>>,
36    /// Staged formulas by sheet when `defer_graph_building` is enabled.
37    staged_formulas: std::collections::HashMap<String, Vec<(u32, u32, String)>>,
38}
39
40#[derive(Debug)]
41pub struct EvalResult {
42    pub computed_vertices: usize,
43    pub cycle_errors: usize,
44    pub elapsed: std::time::Duration,
45}
46
47/// Cached evaluation schedule that can be replayed across multiple recalculations.
48#[derive(Debug)]
49pub struct RecalcPlan {
50    schedule: crate::engine::Schedule,
51}
52
53impl RecalcPlan {
54    pub fn layer_count(&self) -> usize {
55        self.schedule.layers.len()
56    }
57}
58
59fn compute_criteria_mask(
60    view: &crate::arrow_store::ArrowRangeView<'_>,
61    col_in_view: usize,
62    pred: &crate::args::CriteriaPredicate,
63) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
64    use crate::compute_prelude::{boolean, cmp, concat_arrays};
65    use arrow::compute::kernels::comparison::{ilike, nilike};
66    use arrow_array::{
67        Array as _, ArrayRef, BooleanArray, Float64Array, StringArray, builder::BooleanBuilder,
68    };
69
70    // Helper: apply a numeric predicate to a single Float64Array chunk
71    fn apply_numeric_pred(
72        chunk: &Float64Array,
73        pred: &crate::args::CriteriaPredicate,
74    ) -> Option<BooleanArray> {
75        match pred {
76            crate::args::CriteriaPredicate::Gt(n) => {
77                cmp::gt(chunk, &Float64Array::new_scalar(*n)).ok()
78            }
79            crate::args::CriteriaPredicate::Ge(n) => {
80                cmp::gt_eq(chunk, &Float64Array::new_scalar(*n)).ok()
81            }
82            crate::args::CriteriaPredicate::Lt(n) => {
83                cmp::lt(chunk, &Float64Array::new_scalar(*n)).ok()
84            }
85            crate::args::CriteriaPredicate::Le(n) => {
86                cmp::lt_eq(chunk, &Float64Array::new_scalar(*n)).ok()
87            }
88            crate::args::CriteriaPredicate::Eq(v) => match v {
89                formualizer_common::LiteralValue::Number(x) => {
90                    cmp::eq(chunk, &Float64Array::new_scalar(*x)).ok()
91                }
92                formualizer_common::LiteralValue::Int(i) => {
93                    cmp::eq(chunk, &Float64Array::new_scalar(*i as f64)).ok()
94                }
95                _ => None,
96            },
97            crate::args::CriteriaPredicate::Ne(v) => match v {
98                formualizer_common::LiteralValue::Number(x) => {
99                    cmp::neq(chunk, &Float64Array::new_scalar(*x)).ok()
100                }
101                formualizer_common::LiteralValue::Int(i) => {
102                    cmp::neq(chunk, &Float64Array::new_scalar(*i as f64)).ok()
103                }
104                _ => None,
105            },
106            _ => None,
107        }
108    }
109
110    // Check if this is a numeric predicate that can be applied per-chunk
111    let is_numeric_pred = matches!(
112        pred,
113        crate::args::CriteriaPredicate::Gt(_)
114            | crate::args::CriteriaPredicate::Ge(_)
115            | crate::args::CriteriaPredicate::Lt(_)
116            | crate::args::CriteriaPredicate::Le(_)
117            | crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Number(_))
118            | crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Int(_))
119            | crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Number(_))
120            | crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Int(_))
121    );
122
123    // OPTIMIZED PATH: For numeric predicates, apply per-chunk and concatenate boolean masks.
124    // This avoids materializing the full numeric column (64-bit per element) and instead
125    // concatenates boolean masks (1-bit per element) - a 64x memory reduction.
126    if is_numeric_pred {
127        let mut bool_parts: Vec<BooleanArray> = Vec::new();
128        for (_rs, _rl, cols_seg) in view.numbers_slices() {
129            if col_in_view < cols_seg.len() {
130                let chunk = cols_seg[col_in_view].as_ref();
131                let mask = apply_numeric_pred(chunk, pred)?;
132                bool_parts.push(mask);
133            }
134        }
135
136        if bool_parts.is_empty() {
137            return None;
138        } else if bool_parts.len() == 1 {
139            return Some(std::sync::Arc::new(bool_parts.remove(0)));
140        } else {
141            // Concatenate boolean masks (much cheaper than concatenating Float64 arrays)
142            let anys: Vec<&dyn arrow_array::Array> = bool_parts
143                .iter()
144                .map(|a| a as &dyn arrow_array::Array)
145                .collect();
146            let conc: ArrayRef = concat_arrays(&anys).ok()?;
147            let ba = conc.as_any().downcast_ref::<BooleanArray>()?.clone();
148            return Some(std::sync::Arc::new(ba));
149        }
150    }
151
152    // FALLBACK PATH: For text predicates, use the pre-lowered text columns
153    // (which are already concatenated by the view)
154    let lowered_texts: Option<std::sync::Arc<StringArray>> = {
155        let cols = view.lowered_text_columns();
156        if col_in_view >= cols.len() {
157            None
158        } else {
159            let sa = cols[col_in_view]
160                .as_any()
161                .downcast_ref::<StringArray>()?
162                .clone();
163            Some(std::sync::Arc::new(sa))
164        }
165    };
166
167    let out = match pred {
168        crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Text(t)) => {
169            let col = lowered_texts?;
170            let pat = StringArray::new_scalar(t.to_ascii_lowercase());
171            let mut m = ilike(col.as_ref(), &pat).ok()?;
172            if t.is_empty() {
173                // Treat nulls as equal to empty string
174                let mut bb = BooleanBuilder::with_capacity(col.len());
175                for i in 0..col.len() {
176                    bb.append_value(col.is_null(i));
177                }
178                let nulls = bb.finish();
179                m = boolean::or_kleene(&m, &nulls).ok()?;
180            }
181            m
182        }
183        crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Text(t)) => {
184            let col = lowered_texts?;
185            let pat = StringArray::new_scalar(t.to_ascii_lowercase());
186            nilike(col.as_ref(), &pat).ok()?
187        }
188        crate::args::CriteriaPredicate::TextLike {
189            pattern,
190            case_insensitive,
191        } => {
192            let col = lowered_texts?;
193            let p = if *case_insensitive {
194                pattern.to_ascii_lowercase()
195            } else {
196                pattern.clone()
197            };
198            let lp = p.replace('*', "%").replace('?', "_");
199            let pat = StringArray::new_scalar(lp);
200            ilike(col.as_ref(), &pat).ok()?
201        }
202        // Numeric predicates are handled in the optimized path above
203        _ => return None,
204    };
205    Some(std::sync::Arc::new(out))
206}
207
208#[derive(Debug, Clone)]
209pub struct LayerInfo {
210    pub vertex_count: usize,
211    pub parallel_eligible: bool,
212    pub sample_cells: Vec<String>, // Sample of up to 5 cell addresses
213}
214
215#[derive(Debug, Clone)]
216pub struct EvalPlan {
217    pub total_vertices_to_evaluate: usize,
218    pub layers: Vec<LayerInfo>,
219    pub cycles_detected: usize,
220    pub dirty_count: usize,
221    pub volatile_count: usize,
222    pub parallel_enabled: bool,
223    pub estimated_parallel_layers: usize,
224    pub target_cells: Vec<String>,
225}
226
227impl<R> Engine<R>
228where
229    R: EvaluationContext,
230{
231    pub fn new(resolver: R, config: EvalConfig) -> Self {
232        crate::builtins::load_builtins();
233
234        // Initialize thread pool based on config
235        let thread_pool = if config.enable_parallel {
236            let mut builder = ThreadPoolBuilder::new();
237            if let Some(max_threads) = config.max_threads {
238                builder = builder.num_threads(max_threads);
239            }
240
241            match builder.build() {
242                Ok(pool) => Some(Arc::new(pool)),
243                Err(_) => {
244                    // Fall back to sequential evaluation if thread pool creation fails
245                    None
246                }
247            }
248        } else {
249            None
250        };
251
252        Self {
253            graph: DependencyGraph::new_with_config(config.clone()),
254            resolver,
255            config,
256            thread_pool,
257            recalc_epoch: 0,
258            snapshot_id: std::sync::atomic::AtomicU64::new(1),
259            spill_mgr: ShimSpillManager::default(),
260            arrow_sheets: SheetStore::default(),
261            has_edited: false,
262            overlay_compactions: 0,
263            row_bounds_cache: std::sync::RwLock::new(None),
264            staged_formulas: std::collections::HashMap::new(),
265        }
266    }
267
268    /// Create an Engine with a custom thread pool (for shared thread pool scenarios)
269    pub fn with_thread_pool(
270        resolver: R,
271        config: EvalConfig,
272        thread_pool: Arc<rayon::ThreadPool>,
273    ) -> Self {
274        crate::builtins::load_builtins();
275        Self {
276            graph: DependencyGraph::new_with_config(config.clone()),
277            resolver,
278            config,
279            thread_pool: Some(thread_pool),
280            recalc_epoch: 0,
281            snapshot_id: std::sync::atomic::AtomicU64::new(1),
282            spill_mgr: ShimSpillManager::default(),
283            arrow_sheets: SheetStore::default(),
284            has_edited: false,
285            overlay_compactions: 0,
286            row_bounds_cache: std::sync::RwLock::new(None),
287            staged_formulas: std::collections::HashMap::new(),
288        }
289    }
290
291    pub fn default_sheet_id(&self) -> SheetId {
292        self.graph.default_sheet_id()
293    }
294
295    pub fn default_sheet_name(&self) -> &str {
296        self.graph.default_sheet_name()
297    }
298
299    /// Update the workbook seed for deterministic RNGs in functions.
300    pub fn set_workbook_seed(&mut self, seed: u64) {
301        self.config.workbook_seed = seed;
302    }
303
304    /// Set the volatile level policy (Always/OnRecalc/OnOpen)
305    pub fn set_volatile_level(&mut self, level: crate::traits::VolatileLevel) {
306        self.config.volatile_level = level;
307    }
308
309    pub fn sheet_id(&self, name: &str) -> Option<SheetId> {
310        self.graph.sheet_id(name)
311    }
312
313    pub fn set_default_sheet_by_name(&mut self, name: &str) {
314        self.graph.set_default_sheet_by_name(name);
315    }
316
317    pub fn set_default_sheet_by_id(&mut self, id: SheetId) {
318        self.graph.set_default_sheet_by_id(id);
319    }
320
321    pub fn set_sheet_index_mode(&mut self, mode: crate::engine::SheetIndexMode) {
322        self.graph.set_sheet_index_mode(mode);
323    }
324
325    /// Mark data edited: bump snapshot and set edited flag
326    pub fn mark_data_edited(&mut self) {
327        self.snapshot_id
328            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
329        self.has_edited = true;
330    }
331
332    /// Access Arrow sheet store (read-only)
333    pub fn sheet_store(&self) -> &SheetStore {
334        &self.arrow_sheets
335    }
336
337    /// Access Arrow sheet store (mutable)
338    pub fn sheet_store_mut(&mut self) -> &mut SheetStore {
339        &mut self.arrow_sheets
340    }
341
342    /// Stage a formula text instead of inserting into the graph (used when deferring is enabled).
343    pub fn stage_formula_text(&mut self, sheet: &str, row: u32, col: u32, text: String) {
344        self.staged_formulas
345            .entry(sheet.to_string())
346            .or_default()
347            .push((row, col, text));
348    }
349
350    /// Get a staged formula text for a given cell if present (cloned).
351    pub fn get_staged_formula_text(&self, sheet: &str, row: u32, col: u32) -> Option<String> {
352        self.staged_formulas.get(sheet).and_then(|v| {
353            v.iter()
354                .find(|(r, c, _)| *r == row && *c == col)
355                .map(|(_, _, s)| s.clone())
356        })
357    }
358
359    /// Build graph for all staged formulas.
360    pub fn build_graph_all(&mut self) -> Result<(), formualizer_parse::ExcelError> {
361        if self.staged_formulas.is_empty() {
362            return Ok(());
363        }
364        // Take staged formulas before borrowing graph via builder
365        let staged = std::mem::take(&mut self.staged_formulas);
366        let mut builder = self.begin_bulk_ingest();
367        for (sheet, entries) in staged {
368            let sid = builder.add_sheet(&sheet);
369            // Parse with small cache for shared formulas
370            let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
371                rustc_hash::FxHashMap::default();
372            cache.reserve(4096);
373            for (row, col, txt) in entries {
374                let key = if txt.starts_with('=') {
375                    txt
376                } else {
377                    format!("={txt}")
378                };
379                let ast = if let Some(p) = cache.get(&key) {
380                    p.clone()
381                } else {
382                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
383                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
384                            .with_message(e.to_string())
385                    })?;
386                    cache.insert(key.clone(), parsed.clone());
387                    parsed
388                };
389                builder.add_formulas(sid, std::iter::once((row, col, ast)));
390            }
391        }
392        let _ = builder.finish();
393        Ok(())
394    }
395
396    /// Build graph for specific sheets (consuming only those staged entries).
397    pub fn build_graph_for_sheets<'a, I: IntoIterator<Item = &'a str>>(
398        &mut self,
399        sheets: I,
400    ) -> Result<(), formualizer_parse::ExcelError> {
401        let mut any = false;
402        // Collect entries first to avoid aliasing self while builder borrows graph
403        struct SheetFormulas<'s> {
404            sheet: &'s str,
405            entries: Vec<(u32, u32, String)>,
406        }
407        let mut collected: Vec<SheetFormulas<'_>> = Vec::new();
408        for s in sheets {
409            if let Some(entries) = self.staged_formulas.remove(s) {
410                collected.push(SheetFormulas { sheet: s, entries });
411            }
412        }
413        let mut builder = self.begin_bulk_ingest();
414        // small parse cache per call
415        let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
416            rustc_hash::FxHashMap::default();
417        cache.reserve(4096);
418        for SheetFormulas { sheet: s, entries } in collected.into_iter() {
419            any = true;
420            let sid = builder.add_sheet(s);
421            for (row, col, txt) in entries {
422                let key = if txt.starts_with('=') {
423                    txt
424                } else {
425                    format!("={txt}")
426                };
427                let ast = if let Some(p) = cache.get(&key) {
428                    p.clone()
429                } else {
430                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
431                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
432                            .with_message(e.to_string())
433                    })?;
434                    cache.insert(key.clone(), parsed.clone());
435                    parsed
436                };
437                builder.add_formulas(sid, std::iter::once((row, col, ast)));
438            }
439        }
440        if any {
441            let _ = builder.finish();
442        }
443        Ok(())
444    }
445
446    /// Begin bulk Arrow ingest for base values (Phase A)
447    pub fn begin_bulk_ingest_arrow(
448        &mut self,
449    ) -> crate::engine::arrow_ingest::ArrowBulkIngestBuilder<'_, R> {
450        crate::engine::arrow_ingest::ArrowBulkIngestBuilder::new(self)
451    }
452
453    /// Begin bulk updates to Arrow store (Phase C)
454    pub fn begin_bulk_update_arrow(
455        &mut self,
456    ) -> crate::engine::arrow_ingest::ArrowBulkUpdateBuilder<'_, R> {
457        crate::engine::arrow_ingest::ArrowBulkUpdateBuilder::new(self)
458    }
459
460    /// Insert rows (1-based) and mirror into Arrow store when enabled
461    pub fn insert_rows(
462        &mut self,
463        sheet: &str,
464        before: u32,
465        count: u32,
466    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
467    {
468        use crate::engine::graph::editor::vertex_editor::VertexEditor;
469        let sheet_id = self.graph.sheet_id(sheet).ok_or(
470            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
471                name: sheet.to_string(),
472                reason: "Unknown sheet".to_string(),
473            },
474        )?;
475        let mut editor = VertexEditor::new(&mut self.graph);
476        let summary = editor.insert_rows(sheet_id, before, count)?;
477        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
478            let before0 = before.saturating_sub(1) as usize;
479            asheet.insert_rows(before0, count as usize);
480        }
481        self.snapshot_id
482            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
483        self.has_edited = true;
484        Ok(summary)
485    }
486
487    /// Delete rows (1-based) and mirror into Arrow store when enabled
488    pub fn delete_rows(
489        &mut self,
490        sheet: &str,
491        start: u32,
492        count: u32,
493    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
494    {
495        use crate::engine::graph::editor::vertex_editor::VertexEditor;
496        let sheet_id = self.graph.sheet_id(sheet).ok_or(
497            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
498                name: sheet.to_string(),
499                reason: "Unknown sheet".to_string(),
500            },
501        )?;
502        let mut editor = VertexEditor::new(&mut self.graph);
503        let summary = editor.delete_rows(sheet_id, start, count)?;
504        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
505            let start0 = start.saturating_sub(1) as usize;
506            asheet.delete_rows(start0, count as usize);
507        }
508        self.snapshot_id
509            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
510        self.has_edited = true;
511        Ok(summary)
512    }
513
514    /// Insert columns (1-based) and mirror into Arrow store when enabled
515    pub fn insert_columns(
516        &mut self,
517        sheet: &str,
518        before: u32,
519        count: u32,
520    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
521    {
522        use crate::engine::graph::editor::vertex_editor::VertexEditor;
523        let sheet_id = self.graph.sheet_id(sheet).ok_or(
524            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
525                name: sheet.to_string(),
526                reason: "Unknown sheet".to_string(),
527            },
528        )?;
529        let mut editor = VertexEditor::new(&mut self.graph);
530        let summary = editor.insert_columns(sheet_id, before, count)?;
531        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
532            let before0 = before.saturating_sub(1) as usize;
533            asheet.insert_columns(before0, count as usize);
534        }
535        self.snapshot_id
536            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
537        self.has_edited = true;
538        Ok(summary)
539    }
540
541    /// Delete columns (1-based) and mirror into Arrow store when enabled
542    pub fn delete_columns(
543        &mut self,
544        sheet: &str,
545        start: u32,
546        count: u32,
547    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
548    {
549        use crate::engine::graph::editor::vertex_editor::VertexEditor;
550        let sheet_id = self.graph.sheet_id(sheet).ok_or(
551            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
552                name: sheet.to_string(),
553                reason: "Unknown sheet".to_string(),
554            },
555        )?;
556        let mut editor = VertexEditor::new(&mut self.graph);
557        let summary = editor.delete_columns(sheet_id, start, count)?;
558        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
559            let start0 = start.saturating_sub(1) as usize;
560            asheet.delete_columns(start0, count as usize);
561        }
562        self.snapshot_id
563            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
564        self.has_edited = true;
565        Ok(summary)
566    }
567    /// Arrow-backed used row bounds across a column span (1-based inclusive cols).
568    fn arrow_used_row_bounds(
569        &self,
570        sheet: &str,
571        start_col: u32,
572        end_col: u32,
573    ) -> Option<(u32, u32)> {
574        let a = self.sheet_store().sheet(sheet)?;
575        if a.columns.is_empty() {
576            return None;
577        }
578        let sc0 = start_col.saturating_sub(1) as usize;
579        let ec0 = end_col.saturating_sub(1) as usize;
580        let col_hi = a.columns.len().saturating_sub(1);
581        if sc0 > col_hi {
582            return None;
583        }
584        let ec0 = ec0.min(col_hi);
585        // Pass-scoped cache with snapshot guard
586        let snap = self.data_snapshot_id();
587        let mut min_r0: Option<usize> = None;
588        for ci in sc0..=ec0 {
589            let sheet_id = self.graph.sheet_id(sheet)?;
590            if let Some((Some(mv), _)) = self.row_bounds_cache.read().ok().and_then(|g| {
591                g.as_ref()
592                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
593            }) {
594                let mv = mv as usize;
595                min_r0 = Some(min_r0.map(|m| m.min(mv)).unwrap_or(mv));
596                continue;
597            }
598            // Compute and store
599            let (min_c, max_c) = Self::scan_column_used_bounds(a, ci);
600            if let Ok(mut g) = self.row_bounds_cache.write() {
601                g.get_or_insert_with(|| RowBoundsCache::new(snap))
602                    .put_row_bounds(sheet_id, ci, snap, (min_c, max_c));
603            }
604            if let Some(m) = min_c {
605                min_r0 = Some(min_r0.map(|mm| mm.min(m as usize)).unwrap_or(m as usize));
606            }
607        }
608        min_r0?;
609        let mut max_r0: Option<usize> = None;
610        for ci in sc0..=ec0 {
611            let sheet_id = self.graph.sheet_id(sheet)?;
612            if let Some((_, Some(mv))) = self.row_bounds_cache.read().ok().and_then(|g| {
613                g.as_ref()
614                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
615            }) {
616                let mv = mv as usize;
617                max_r0 = Some(max_r0.map(|m| m.max(mv)).unwrap_or(mv));
618                continue;
619            }
620            let (_min_c, max_c) = Self::scan_column_used_bounds(a, ci);
621            if let Ok(mut g) = self.row_bounds_cache.write() {
622                g.get_or_insert_with(|| RowBoundsCache::new(snap))
623                    .put_row_bounds(sheet_id, ci, snap, (_min_c, max_c));
624            }
625            if let Some(m) = max_c {
626                max_r0 = Some(max_r0.map(|mm| mm.max(m as usize)).unwrap_or(m as usize));
627            }
628        }
629        match (min_r0, max_r0) {
630            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
631            _ => None,
632        }
633    }
634
635    fn scan_column_used_bounds(
636        a: &crate::arrow_store::ArrowSheet,
637        ci: usize,
638    ) -> (Option<u32>, Option<u32>) {
639        let col = &a.columns[ci];
640        // Min
641        let mut min_r0: Option<u32> = None;
642        for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
643            let tags = chunk.type_tag.values();
644            for (off, &t) in tags.iter().enumerate() {
645                let overlay_non_empty = chunk
646                    .overlay
647                    .get(off)
648                    .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
649                    .unwrap_or(false);
650                if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
651                    let row0 = a.chunk_starts[chunk_idx] + off;
652                    min_r0 = Some(row0 as u32);
653                    break;
654                }
655            }
656            if min_r0.is_some() {
657                break;
658            }
659        }
660        // Max
661        let mut max_r0: Option<u32> = None;
662        for (chunk_rel, chunk) in col.chunks.iter().enumerate().rev() {
663            let chunk_idx = chunk_rel;
664            let tags = chunk.type_tag.values();
665            for (rev_idx, &t) in tags.iter().enumerate().rev() {
666                let overlay_non_empty = chunk
667                    .overlay
668                    .get(rev_idx)
669                    .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
670                    .unwrap_or(false);
671                if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
672                    let row0 = a.chunk_starts[chunk_idx] + rev_idx;
673                    max_r0 = Some(row0 as u32);
674                    break;
675                }
676            }
677            if max_r0.is_some() {
678                break;
679            }
680        }
681        (min_r0, max_r0)
682    }
683
684    /// Arrow-backed used column bounds across a row span (1-based inclusive rows).
685    fn arrow_used_col_bounds(
686        &self,
687        sheet: &str,
688        start_row: u32,
689        end_row: u32,
690    ) -> Option<(u32, u32)> {
691        let a = self.sheet_store().sheet(sheet)?;
692        if a.columns.is_empty() {
693            return None;
694        }
695        let sr0 = start_row.saturating_sub(1) as usize;
696        let er0 = end_row.saturating_sub(1) as usize;
697        if sr0 > er0 {
698            return None;
699        }
700        // Map start/end rows into chunk ranges
701        // We will scan each column for any non-empty within [sr0..=er0]
702        let mut min_c0: Option<usize> = None;
703        let mut max_c0: Option<usize> = None;
704        // Precompute chunk bounds for row range
705        for (ci, col) in a.columns.iter().enumerate() {
706            // Skip columns with no chunks (shouldn't happen)
707            if col.chunks.is_empty() {
708                continue;
709            }
710            let mut any_in_range = false;
711            for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
712                let chunk_start = a.chunk_starts[chunk_idx];
713                let chunk_len = chunk.type_tag.len();
714                let chunk_end = chunk_start + chunk_len.saturating_sub(1);
715                // check intersection
716                if sr0 > chunk_end || er0 < chunk_start {
717                    continue;
718                }
719                let start_off = sr0.max(chunk_start) - chunk_start;
720                let end_off = er0.min(chunk_end) - chunk_start;
721                let tags = chunk.type_tag.values();
722                for off in start_off..=end_off {
723                    let overlay_non_empty = chunk
724                        .overlay
725                        .get(off)
726                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
727                        .unwrap_or(false);
728                    if overlay_non_empty || tags[off] != crate::arrow_store::TypeTag::Empty as u8 {
729                        any_in_range = true;
730                        break;
731                    }
732                }
733                if any_in_range {
734                    break;
735                }
736            }
737            if any_in_range {
738                min_c0 = Some(min_c0.map(|m| m.min(ci)).unwrap_or(ci));
739                max_c0 = Some(max_c0.map(|m| m.max(ci)).unwrap_or(ci));
740            }
741        }
742        match (min_c0, max_c0) {
743            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
744            _ => None,
745        }
746    }
747
748    /// Mirror a single cell value into the Arrow overlay if enabled.
749    /// Handles capacity growth, per-chunk overlay set, and heuristic compaction.
750    fn mirror_value_to_overlay(&mut self, sheet: &str, row: u32, col: u32, value: &LiteralValue) {
751        if !(self.config.arrow_storage_enabled && self.config.delta_overlay_enabled) {
752            return;
753        }
754        let asheet = match self.arrow_sheets.sheet_mut(sheet) {
755            Some(s) => s,
756            None => return,
757        };
758        let row0 = row.saturating_sub(1) as usize;
759        let col0 = col.saturating_sub(1) as usize;
760        if col0 >= asheet.columns.len() {
761            return;
762        }
763        if row0 >= asheet.nrows as usize {
764            asheet.ensure_row_capacity(row0 + 1);
765        }
766        if let Some((ch_idx, in_off)) = asheet.chunk_of_row(row0) {
767            use crate::arrow_store::OverlayValue;
768            let ov = match value {
769                LiteralValue::Empty => OverlayValue::Empty,
770                LiteralValue::Int(i) => OverlayValue::Number(*i as f64),
771                LiteralValue::Number(n) => OverlayValue::Number(*n),
772                LiteralValue::Boolean(b) => OverlayValue::Boolean(*b),
773                LiteralValue::Text(s) => OverlayValue::Text(std::sync::Arc::from(s.clone())),
774                LiteralValue::Error(e) => {
775                    OverlayValue::Error(crate::arrow_store::map_error_code(e.kind))
776                }
777                LiteralValue::Date(d) => {
778                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
779                    let serial = crate::builtins::datetime::datetime_to_serial_for(
780                        self.config.date_system,
781                        &dt,
782                    );
783                    OverlayValue::Number(serial)
784                }
785                LiteralValue::DateTime(dt) => {
786                    let serial = crate::builtins::datetime::datetime_to_serial_for(
787                        self.config.date_system,
788                        dt,
789                    );
790                    OverlayValue::Number(serial)
791                }
792                LiteralValue::Time(t) => {
793                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
794                    OverlayValue::Number(serial)
795                }
796                LiteralValue::Duration(d) => {
797                    let serial = d.num_seconds() as f64 / 86_400.0;
798                    OverlayValue::Number(serial)
799                }
800                LiteralValue::Pending => OverlayValue::Pending,
801                LiteralValue::Array(_) => OverlayValue::Error(crate::arrow_store::map_error_code(
802                    formualizer_common::ExcelErrorKind::Value,
803                )),
804            };
805            let colref = &mut asheet.columns[col0];
806            let ch = &mut colref.chunks[ch_idx];
807            ch.overlay.set(in_off, ov);
808            // Heuristic compaction: > len/50 or > 1024
809            let abs_threshold = 1024usize;
810            let frac_den = 50usize;
811            if asheet.maybe_compact_chunk(col0, ch_idx, abs_threshold, frac_den) {
812                self.overlay_compactions = self.overlay_compactions.saturating_add(1);
813            }
814        }
815    }
816
817    fn mirror_vertex_value_to_overlay(&mut self, vertex_id: VertexId, value: &LiteralValue) {
818        if !(self.config.arrow_storage_enabled
819            && self.config.delta_overlay_enabled
820            && self.config.write_formula_overlay_enabled)
821        {
822            return;
823        }
824        if !matches!(
825            self.graph.get_vertex_kind(vertex_id),
826            VertexKind::FormulaScalar | VertexKind::FormulaArray
827        ) {
828            return;
829        }
830        let Some(cell) = self.graph.get_cell_ref(vertex_id) else {
831            return;
832        };
833        let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
834        self.mirror_value_to_overlay(
835            &sheet_name,
836            cell.coord.row() + 1,
837            cell.coord.col() + 1,
838            value,
839        );
840    }
841
842    fn resolve_sheet_locator_for_write(
843        &mut self,
844        loc: formualizer_common::SheetLocator<'_>,
845        current_sheet: &str,
846    ) -> Result<SheetId, ExcelError> {
847        Ok(match loc {
848            formualizer_common::SheetLocator::Id(id) => id,
849            formualizer_common::SheetLocator::Name(name) => self.graph.sheet_id_mut(name.as_ref()),
850            formualizer_common::SheetLocator::Current => self.graph.sheet_id_mut(current_sheet),
851        })
852    }
853
854    fn resolve_sheet_locator_for_read(
855        &self,
856        loc: formualizer_common::SheetLocator<'_>,
857        current_sheet: &str,
858    ) -> Result<SheetId, ExcelError> {
859        match loc {
860            formualizer_common::SheetLocator::Id(id) => Ok(id),
861            formualizer_common::SheetLocator::Name(name) => self
862                .graph
863                .sheet_id(name.as_ref())
864                .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref)),
865            formualizer_common::SheetLocator::Current => self
866                .graph
867                .sheet_id(current_sheet)
868                .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref)),
869        }
870    }
871
872    /// Set a cell value
873    pub fn set_cell_value(
874        &mut self,
875        sheet: &str,
876        row: u32,
877        col: u32,
878        value: LiteralValue,
879    ) -> Result<(), ExcelError> {
880        self.graph.set_cell_value(sheet, row, col, value.clone())?;
881        // Mirror into Arrow overlay when enabled
882        self.mirror_value_to_overlay(sheet, row, col, &value);
883        // Advance snapshot to reflect external mutation
884        self.snapshot_id
885            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
886        self.has_edited = true;
887        Ok(())
888    }
889
890    pub fn set_cell_value_ref(
891        &mut self,
892        cell: formualizer_common::SheetCellRef<'_>,
893        current_sheet: &str,
894        value: LiteralValue,
895    ) -> Result<(), ExcelError> {
896        let owned = cell.into_owned();
897        let sheet_id = self.resolve_sheet_locator_for_write(owned.sheet, current_sheet)?;
898        let sheet_name = self.graph.sheet_name(sheet_id).to_string();
899        self.set_cell_value(
900            &sheet_name,
901            owned.coord.row() + 1,
902            owned.coord.col() + 1,
903            value,
904        )
905    }
906
907    pub fn set_cell_formula_ref(
908        &mut self,
909        cell: formualizer_common::SheetCellRef<'_>,
910        current_sheet: &str,
911        ast: ASTNode,
912    ) -> Result<(), ExcelError> {
913        let owned = cell.into_owned();
914        let sheet_id = self.resolve_sheet_locator_for_write(owned.sheet, current_sheet)?;
915        let sheet_name = self.graph.sheet_name(sheet_id).to_string();
916        self.set_cell_formula(
917            &sheet_name,
918            owned.coord.row() + 1,
919            owned.coord.col() + 1,
920            ast,
921        )
922    }
923
924    pub fn get_cell_value_ref(
925        &self,
926        cell: formualizer_common::SheetCellRef<'_>,
927        current_sheet: &str,
928    ) -> Result<Option<LiteralValue>, ExcelError> {
929        let owned = cell.into_owned();
930        let sheet_id = self.resolve_sheet_locator_for_read(owned.sheet, current_sheet)?;
931        let sheet_name = self.graph.sheet_name(sheet_id);
932        Ok(self.get_cell_value(sheet_name, owned.coord.row() + 1, owned.coord.col() + 1))
933    }
934
935    pub fn resolve_range_view_sheet_ref<'c>(
936        &'c self,
937        r: &formualizer_common::SheetRef<'_>,
938        current_sheet: &str,
939    ) -> Result<RangeView<'c>, ExcelError> {
940        use formualizer_common::SheetLocator;
941
942        let sheet_to_opt_name = |loc: SheetLocator<'_>| -> Result<Option<String>, ExcelError> {
943            match loc {
944                SheetLocator::Current => Ok(None),
945                SheetLocator::Name(name) => Ok(Some(name.as_ref().to_string())),
946                SheetLocator::Id(id) => Ok(Some(self.graph.sheet_name(id).to_string())),
947            }
948        };
949
950        let rt = match r {
951            formualizer_common::SheetRef::Cell(cell) => ReferenceType::Cell {
952                sheet: sheet_to_opt_name(cell.sheet.clone())?,
953                row: cell.coord.row() + 1,
954                col: cell.coord.col() + 1,
955            },
956            formualizer_common::SheetRef::Range(range) => ReferenceType::Range {
957                sheet: sheet_to_opt_name(range.sheet.clone())?,
958                start_row: range.start_row.map(|b| b.index + 1),
959                start_col: range.start_col.map(|b| b.index + 1),
960                end_row: range.end_row.map(|b| b.index + 1),
961                end_col: range.end_col.map(|b| b.index + 1),
962            },
963        };
964
965        crate::traits::EvaluationContext::resolve_range_view(self, &rt, current_sheet)
966    }
967
968    /// Set a cell formula
969    pub fn set_cell_formula(
970        &mut self,
971        sheet: &str,
972        row: u32,
973        col: u32,
974        ast: ASTNode,
975    ) -> Result<(), ExcelError> {
976        let volatile = self.is_ast_volatile_with_provider(&ast);
977        self.graph
978            .set_cell_formula_with_volatility(sheet, row, col, ast, volatile)?;
979        // Advance snapshot to reflect external mutation
980        self.snapshot_id
981            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
982        self.has_edited = true;
983        Ok(())
984    }
985
986    /// Bulk set many formulas on a sheet. Skips per-cell snapshot bumping and minimizes edge rebuilds.
987    pub fn bulk_set_formulas<I>(&mut self, sheet: &str, items: I) -> Result<usize, ExcelError>
988    where
989        I: IntoIterator<Item = (u32, u32, ASTNode)>,
990    {
991        let collected: Vec<(u32, u32, ASTNode)> = items.into_iter().collect();
992        let vol_flags: Vec<bool> = collected
993            .iter()
994            .map(|(_, _, ast)| self.is_ast_volatile_with_provider(ast))
995            .collect();
996        let n = self
997            .graph
998            .bulk_set_formulas_with_volatility(sheet, collected, vol_flags)?;
999        // Single snapshot bump after batch
1000        self.snapshot_id
1001            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1002        if n > 0 {
1003            self.has_edited = true;
1004        }
1005        Ok(n)
1006    }
1007
1008    /// Get a cell value
1009    pub fn get_cell_value(&self, sheet: &str, row: u32, col: u32) -> Option<LiteralValue> {
1010        // Prefer Arrow for non-formula cells. For formula cells, use graph value.
1011        if let Some(sheet_id) = self.graph.sheet_id(sheet) {
1012            // If a formula exists at this address, return graph value
1013            let coord = Coord::from_excel(row, col, true, true);
1014            let addr = CellRef::new(sheet_id, coord);
1015            if let Some(vid) = self.graph.get_vertex_id_for_address(&addr) {
1016                match self.graph.get_vertex_kind(*vid) {
1017                    VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1018                        return self.graph.get_cell_value(sheet, row, col);
1019                    }
1020                    _ => {}
1021                }
1022            }
1023        }
1024        if let Some(asheet) = self.sheet_store().sheet(sheet) {
1025            let r0 = row.saturating_sub(1) as usize;
1026            let c0 = col.saturating_sub(1) as usize;
1027            let av = asheet.range_view(r0, c0, r0, c0);
1028            let v = av.get_cell(0, 0);
1029            return Some(v);
1030        }
1031        self.graph.get_cell_value(sheet, row, col)
1032    }
1033
1034    /// Get formula AST (if any) and current stored value for a cell
1035    pub fn get_cell(
1036        &self,
1037        sheet: &str,
1038        row: u32,
1039        col: u32,
1040    ) -> Option<(Option<formualizer_parse::ASTNode>, Option<LiteralValue>)> {
1041        let v = self.get_cell_value(sheet, row, col);
1042        let sheet_id = self.graph.sheet_id(sheet)?;
1043        let coord = Coord::from_excel(row, col, true, true);
1044        let cell = CellRef::new(sheet_id, coord);
1045        let vid = self.graph.get_vertex_for_cell(&cell)?;
1046        let ast = self.graph.get_formula_id(vid).and_then(|ast_id| {
1047            self.graph
1048                .data_store()
1049                .retrieve_ast(ast_id, self.graph.sheet_reg())
1050        });
1051        Some((ast, v))
1052    }
1053
1054    /// Begin batch operations - defer CSR rebuilds for better performance
1055    pub fn begin_batch(&mut self) {
1056        self.graph.begin_batch();
1057    }
1058
1059    /// End batch operations and trigger CSR rebuild
1060    pub fn end_batch(&mut self) {
1061        self.graph.end_batch();
1062    }
1063
1064    /// Evaluate a single vertex.
1065    /// This is the core of the sequential evaluation logic for Milestone 3.1.
1066    pub fn evaluate_vertex(&mut self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
1067        // Check if vertex exists
1068        if !self.graph.vertex_exists(vertex_id) {
1069            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
1070                .with_message(format!("Vertex not found: {vertex_id:?}")));
1071        }
1072
1073        // Get vertex kind and check if it needs evaluation
1074        let kind = self.graph.get_vertex_kind(vertex_id);
1075        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
1076
1077        let ast_id = match kind {
1078            VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1079                if let Some(ast_id) = self.graph.get_formula_id(vertex_id) {
1080                    ast_id
1081                } else {
1082                    return Ok(LiteralValue::Int(0));
1083                }
1084            }
1085            VertexKind::Empty | VertexKind::Cell => {
1086                // Check if there's a value stored
1087                if let Some(value) = self.graph.get_value(vertex_id) {
1088                    return Ok(value.clone());
1089                } else {
1090                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
1091                }
1092            }
1093            VertexKind::NamedScalar => {
1094                let value = self.evaluate_named_scalar(vertex_id, sheet_id)?;
1095                return Ok(value);
1096            }
1097            VertexKind::NamedArray => {
1098                return Err(ExcelError::new(ExcelErrorKind::NImpl)
1099                    .with_message("Array named ranges not yet implemented"));
1100            }
1101            VertexKind::InfiniteRange | VertexKind::Range | VertexKind::External => {
1102                // Not directly evaluatable here; return stored or 0
1103                if let Some(value) = self.graph.get_value(vertex_id) {
1104                    return Ok(value.clone());
1105                } else {
1106                    return Ok(LiteralValue::Int(0));
1107                }
1108            }
1109        };
1110
1111        // The interpreter uses a reference to the engine as the context.
1112        let sheet_name = self.graph.sheet_name(sheet_id);
1113        let cell_ref = self
1114            .graph
1115            .get_cell_ref(vertex_id)
1116            .expect("cell ref for vertex");
1117        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
1118
1119        let result =
1120            interpreter.evaluate_arena_ast(ast_id, self.graph.data_store(), self.graph.sheet_reg());
1121
1122        // If array result, perform spill from the anchor cell
1123        match result {
1124            Ok(LiteralValue::Array(rows)) => {
1125                // Update kind to FormulaArray for tracking
1126                self.graph
1127                    .set_kind(vertex_id, crate::engine::vertex::VertexKind::FormulaArray);
1128                // Build target cells rectangle starting from anchor
1129                let anchor = self
1130                    .graph
1131                    .get_cell_ref(vertex_id)
1132                    .expect("cell ref for vertex");
1133                let sheet_id = anchor.sheet_id;
1134                let h = rows.len() as u32;
1135                let w = rows.first().map(|r| r.len()).unwrap_or(0) as u32;
1136                // Bounds check to avoid out-of-range writes (align to AbsCoord capacity)
1137                const PACKED_MAX_ROW: u32 = 1_048_575; // 20-bit max
1138                const PACKED_MAX_COL: u32 = 16_383; // 14-bit max
1139                let end_row = anchor.coord.row().saturating_add(h).saturating_sub(1);
1140                let end_col = anchor.coord.col().saturating_add(w).saturating_sub(1);
1141                if end_row > PACKED_MAX_ROW || end_col > PACKED_MAX_COL {
1142                    let spill_err = ExcelError::new(ExcelErrorKind::Spill)
1143                        .with_message("Spill exceeds sheet bounds")
1144                        .with_extra(formualizer_common::ExcelErrorExtra::Spill {
1145                            expected_rows: h,
1146                            expected_cols: w,
1147                        });
1148                    let spill_val = LiteralValue::Error(spill_err.clone());
1149                    self.graph.update_vertex_value(vertex_id, spill_val.clone());
1150                    return Ok(spill_val);
1151                }
1152                let mut targets = Vec::new();
1153                for r in 0..h {
1154                    for c in 0..w {
1155                        targets.push(self.graph.make_cell_ref_internal(
1156                            sheet_id,
1157                            anchor.coord.row() + r,
1158                            anchor.coord.col() + c,
1159                        ));
1160                    }
1161                }
1162
1163                // Plan spill via spill manager shim
1164                match self.spill_mgr.reserve(
1165                    vertex_id,
1166                    anchor,
1167                    SpillShape { rows: h, cols: w },
1168                    SpillMeta {
1169                        epoch: self.recalc_epoch,
1170                        config: self.config.spill,
1171                    },
1172                ) {
1173                    Ok(()) => {
1174                        // Commit: write values to grid
1175                        // Default conflict policy is Error + FirstWins; reserve() enforces in-flight locks
1176                        // and plan_spill_region enforces overlap with committed formulas/spills/values.
1177                        if let Err(e) =
1178                            self.commit_spill_and_mirror(vertex_id, &targets, rows.clone())
1179                        {
1180                            // If commit fails, mark as error
1181                            self.graph
1182                                .update_vertex_value(vertex_id, LiteralValue::Error(e.clone()));
1183                            return Ok(LiteralValue::Error(e));
1184                        }
1185                        // Anchor shows the top-left value, like Excel
1186                        let top_left = rows
1187                            .first()
1188                            .and_then(|r| r.first())
1189                            .cloned()
1190                            .unwrap_or(LiteralValue::Empty);
1191                        self.graph.update_vertex_value(vertex_id, top_left.clone());
1192                        Ok(top_left)
1193                    }
1194                    Err(e) => {
1195                        let spill_err = ExcelError::new(ExcelErrorKind::Spill)
1196                            .with_message(e.message.unwrap_or_else(|| "Spill blocked".to_string()))
1197                            .with_extra(formualizer_common::ExcelErrorExtra::Spill {
1198                                expected_rows: h,
1199                                expected_cols: w,
1200                            });
1201                        let spill_val = LiteralValue::Error(spill_err.clone());
1202                        self.graph.update_vertex_value(vertex_id, spill_val.clone());
1203                        Ok(spill_val)
1204                    }
1205                }
1206            }
1207            Ok(other) => {
1208                // Scalar result: store value and ensure any previous spill is cleared
1209                self.graph.clear_spill_region(vertex_id);
1210                self.graph.update_vertex_value(vertex_id, other.clone());
1211                // Optionally mirror into Arrow overlay for Arrow-backed reads
1212                if self.config.arrow_storage_enabled
1213                    && self.config.delta_overlay_enabled
1214                    && self.config.write_formula_overlay_enabled
1215                {
1216                    let anchor = self
1217                        .graph
1218                        .get_cell_ref(vertex_id)
1219                        .expect("cell ref for vertex");
1220                    let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
1221                    self.mirror_value_to_overlay(
1222                        &sheet_name,
1223                        anchor.coord.row() + 1,
1224                        anchor.coord.col() + 1,
1225                        &other,
1226                    );
1227                }
1228                Ok(other)
1229            }
1230            Err(e) => {
1231                // Runtime Excel error: store as a cell value instead of propagating
1232                // as an exception so bulk eval paths don't fail the whole pass.
1233                self.graph.clear_spill_region(vertex_id);
1234                let err_val = LiteralValue::Error(e.clone());
1235                self.graph.update_vertex_value(vertex_id, err_val.clone());
1236                if self.config.arrow_storage_enabled
1237                    && self.config.delta_overlay_enabled
1238                    && self.config.write_formula_overlay_enabled
1239                {
1240                    let anchor = self
1241                        .graph
1242                        .get_cell_ref(vertex_id)
1243                        .expect("cell ref for vertex");
1244                    let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
1245                    self.mirror_value_to_overlay(
1246                        &sheet_name,
1247                        anchor.coord.row() + 1,
1248                        anchor.coord.col() + 1,
1249                        &err_val,
1250                    );
1251                }
1252                Ok(err_val)
1253            }
1254        }
1255    }
1256
1257    fn evaluate_named_scalar(
1258        &mut self,
1259        vertex_id: VertexId,
1260        sheet_id: SheetId,
1261    ) -> Result<LiteralValue, ExcelError> {
1262        let named_range = self.graph.named_range_by_vertex(vertex_id).ok_or_else(|| {
1263            ExcelError::new(ExcelErrorKind::Name)
1264                .with_message("Named range metadata missing".to_string())
1265        })?;
1266
1267        match &named_range.definition {
1268            NamedDefinition::Cell(cell_ref) => {
1269                if let Some(dep_vertex) = self.graph.get_vertex_for_cell(cell_ref) {
1270                    let value = if let Some(existing) = self.graph.get_value(dep_vertex) {
1271                        existing.clone()
1272                    } else {
1273                        let computed = self.evaluate_vertex(dep_vertex)?;
1274                        self.graph.update_vertex_value(dep_vertex, computed.clone());
1275                        computed
1276                    };
1277                    self.graph.update_vertex_value(vertex_id, value.clone());
1278                    Ok(value)
1279                } else {
1280                    let empty = LiteralValue::Empty;
1281                    self.graph.update_vertex_value(vertex_id, empty.clone());
1282                    Ok(empty)
1283                }
1284            }
1285            NamedDefinition::Formula { ast, .. } => {
1286                let context_sheet = match named_range.scope {
1287                    NameScope::Sheet(id) => id,
1288                    NameScope::Workbook => sheet_id,
1289                };
1290                let sheet_name = self.graph.sheet_name(context_sheet);
1291                let cell_ref = self
1292                    .graph
1293                    .get_cell_ref(vertex_id)
1294                    .unwrap_or_else(|| self.graph.make_cell_ref(sheet_name, 0, 0));
1295                let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
1296                match interpreter.evaluate_ast(ast) {
1297                    Ok(LiteralValue::Array(_)) => {
1298                        let err = ExcelError::new(ExcelErrorKind::NImpl)
1299                            .with_message("Array result in scalar named range".to_string());
1300                        let err_val = LiteralValue::Error(err.clone());
1301                        self.graph.update_vertex_value(vertex_id, err_val.clone());
1302                        Ok(err_val)
1303                    }
1304                    Ok(value) => {
1305                        self.graph.update_vertex_value(vertex_id, value.clone());
1306                        Ok(value)
1307                    }
1308                    Err(err) => {
1309                        let err_val = LiteralValue::Error(err.clone());
1310                        self.graph.update_vertex_value(vertex_id, err_val.clone());
1311                        Ok(err_val)
1312                    }
1313                }
1314            }
1315            NamedDefinition::Range(_) => Err(ExcelError::new(ExcelErrorKind::NImpl)
1316                .with_message("Array named ranges not yet implemented".to_string())),
1317        }
1318    }
1319
1320    /// Evaluate only the necessary precedents for specific target cells (demand-driven)
1321    pub fn evaluate_until(
1322        &mut self,
1323        targets: &[(&str, u32, u32)],
1324    ) -> Result<EvalResult, ExcelError> {
1325        #[cfg(feature = "tracing")]
1326        let _span_eval = tracing::info_span!("evaluate_until", targets = targets.len()).entered();
1327        let start = web_time::Instant::now();
1328
1329        // Parse target cell addresses
1330        let mut target_addrs = Vec::new();
1331        for (sheet, row, col) in targets {
1332            // For now, assume simple A1-style references on default sheet
1333            // TODO: Parse complex references with sheets
1334            let sheet_id = self.graph.sheet_id_mut(sheet);
1335            let coord = Coord::from_excel(*row, *col, true, true);
1336            target_addrs.push(CellRef::new(sheet_id, coord));
1337        }
1338
1339        // Find vertex IDs for targets
1340        let mut target_vertex_ids = Vec::new();
1341        for addr in &target_addrs {
1342            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
1343                target_vertex_ids.push(*vertex_id);
1344            }
1345        }
1346
1347        if target_vertex_ids.is_empty() {
1348            return Ok(EvalResult {
1349                computed_vertices: 0,
1350                cycle_errors: 0,
1351                elapsed: start.elapsed(),
1352            });
1353        }
1354
1355        // Build demand subgraph with virtual edges for compressed ranges
1356        #[cfg(feature = "tracing")]
1357        let _span_sub = tracing::info_span!("demand_subgraph_build").entered();
1358        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
1359        #[cfg(feature = "tracing")]
1360        drop(_span_sub);
1361
1362        if precedents_to_eval.is_empty() {
1363            return Ok(EvalResult {
1364                computed_vertices: 0,
1365                cycle_errors: 0,
1366                elapsed: start.elapsed(),
1367            });
1368        }
1369
1370        // Create schedule for the minimal subgraph, honoring virtual edges
1371        let scheduler = Scheduler::new(&self.graph);
1372        #[cfg(feature = "tracing")]
1373        let _span_sched =
1374            tracing::info_span!("schedule_build", vertices = precedents_to_eval.len()).entered();
1375        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
1376        #[cfg(feature = "tracing")]
1377        drop(_span_sched);
1378
1379        // Handle cycles first
1380        let mut cycle_errors = 0;
1381        for cycle in &schedule.cycles {
1382            cycle_errors += 1;
1383            let circ_error = LiteralValue::Error(
1384                ExcelError::new(ExcelErrorKind::Circ)
1385                    .with_message("Circular dependency detected".to_string()),
1386            );
1387            for &vertex_id in cycle {
1388                self.graph
1389                    .update_vertex_value(vertex_id, circ_error.clone());
1390            }
1391        }
1392
1393        // Evaluate layers (parallel when enabled, mirroring evaluate_all)
1394        let mut computed_vertices = 0;
1395        for layer in &schedule.layers {
1396            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1397                computed_vertices += self.evaluate_layer_parallel(layer)?;
1398            } else {
1399                computed_vertices += self.evaluate_layer_sequential(layer)?;
1400            }
1401        }
1402
1403        // Clear warmup context at end of evaluation
1404
1405        // Clear dirty flags for evaluated vertices
1406        self.graph.clear_dirty_flags(&precedents_to_eval);
1407
1408        // Re-dirty volatile vertices
1409        self.graph.redirty_volatiles();
1410
1411        Ok(EvalResult {
1412            computed_vertices,
1413            cycle_errors,
1414            elapsed: start.elapsed(),
1415        })
1416    }
1417
1418    /// Build a reusable evaluation plan that covers every formula vertex in the workbook.
1419    pub fn build_recalc_plan(&self) -> Result<RecalcPlan, ExcelError> {
1420        let mut vertices: Vec<VertexId> = self.graph.vertices_with_formulas().collect();
1421        vertices.sort_unstable();
1422        if vertices.is_empty() {
1423            return Ok(RecalcPlan {
1424                schedule: crate::engine::Schedule {
1425                    layers: Vec::new(),
1426                    cycles: Vec::new(),
1427                },
1428            });
1429        }
1430
1431        let scheduler = Scheduler::new(&self.graph);
1432        let schedule = scheduler.create_schedule(&vertices)?;
1433        Ok(RecalcPlan { schedule })
1434    }
1435
1436    /// Evaluate using a previously constructed plan. This avoids rebuilding layer schedules for each run.
1437    pub fn evaluate_recalc_plan(&mut self, plan: &RecalcPlan) -> Result<EvalResult, ExcelError> {
1438        if self.config.defer_graph_building {
1439            self.build_graph_all()?;
1440        }
1441
1442        let start = web_time::Instant::now();
1443        let dirty_vertices = self.graph.get_evaluation_vertices();
1444        if dirty_vertices.is_empty() {
1445            return Ok(EvalResult {
1446                computed_vertices: 0,
1447                cycle_errors: 0,
1448                elapsed: start.elapsed(),
1449            });
1450        }
1451
1452        let dirty_set: FxHashSet<VertexId> = dirty_vertices.iter().copied().collect();
1453        let mut computed_vertices = 0;
1454        let mut cycle_errors = 0;
1455
1456        if !plan.schedule.cycles.is_empty() {
1457            let circ_error = LiteralValue::Error(
1458                ExcelError::new(ExcelErrorKind::Circ)
1459                    .with_message("Circular dependency detected".to_string()),
1460            );
1461            for cycle in &plan.schedule.cycles {
1462                if !cycle.iter().any(|v| dirty_set.contains(v)) {
1463                    continue;
1464                }
1465                cycle_errors += 1;
1466                for &vertex_id in cycle {
1467                    if dirty_set.contains(&vertex_id) {
1468                        self.graph
1469                            .update_vertex_value(vertex_id, circ_error.clone());
1470                    }
1471                }
1472            }
1473        }
1474
1475        for layer in &plan.schedule.layers {
1476            let work: Vec<VertexId> = layer
1477                .vertices
1478                .iter()
1479                .copied()
1480                .filter(|v| dirty_set.contains(v))
1481                .collect();
1482            if work.is_empty() {
1483                continue;
1484            }
1485            let temp_layer = crate::engine::scheduler::Layer { vertices: work };
1486            if self.thread_pool.is_some() && temp_layer.vertices.len() > 1 {
1487                computed_vertices += self.evaluate_layer_parallel(&temp_layer)?;
1488            } else {
1489                computed_vertices += self.evaluate_layer_sequential(&temp_layer)?;
1490            }
1491        }
1492
1493        self.graph.clear_dirty_flags(&dirty_vertices);
1494        self.graph.redirty_volatiles();
1495
1496        Ok(EvalResult {
1497            computed_vertices,
1498            cycle_errors,
1499            elapsed: start.elapsed(),
1500        })
1501    }
1502
1503    /// Evaluate all dirty/volatile vertices
1504    pub fn evaluate_all(&mut self) -> Result<EvalResult, ExcelError> {
1505        if self.config.defer_graph_building {
1506            // Build graph for all staged formulas before evaluating
1507            self.build_graph_all()?;
1508        }
1509        #[cfg(feature = "tracing")]
1510        let _span_eval = tracing::info_span!("evaluate_all").entered();
1511        let start = web_time::Instant::now();
1512        let mut computed_vertices = 0;
1513        let mut cycle_errors = 0;
1514
1515        let to_evaluate = self.graph.get_evaluation_vertices();
1516        if to_evaluate.is_empty() {
1517            return Ok(EvalResult {
1518                computed_vertices,
1519                cycle_errors,
1520                elapsed: start.elapsed(),
1521            });
1522        }
1523
1524        let scheduler = Scheduler::new(&self.graph);
1525        let schedule = scheduler.create_schedule(&to_evaluate)?;
1526
1527        // Handle cycles first by marking them with #CIRC!
1528        for cycle in &schedule.cycles {
1529            cycle_errors += 1;
1530            let circ_error = LiteralValue::Error(
1531                ExcelError::new(ExcelErrorKind::Circ)
1532                    .with_message("Circular dependency detected".to_string()),
1533            );
1534            for &vertex_id in cycle {
1535                self.graph
1536                    .update_vertex_value(vertex_id, circ_error.clone());
1537            }
1538        }
1539
1540        // Evaluate acyclic layers (parallel or sequential based on config)
1541        for layer in &schedule.layers {
1542            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1543                computed_vertices += self.evaluate_layer_parallel(layer)?;
1544            } else {
1545                computed_vertices += self.evaluate_layer_sequential(layer)?;
1546            }
1547        }
1548
1549        // Clear dirty flags for all evaluated vertices (including cycles)
1550        self.graph.clear_dirty_flags(&to_evaluate);
1551
1552        // Re-dirty volatile vertices for the next evaluation cycle
1553        self.graph.redirty_volatiles();
1554
1555        // Advance recalc epoch after a full evaluation pass finishes
1556        self.recalc_epoch = self.recalc_epoch.wrapping_add(1);
1557
1558        Ok(EvalResult {
1559            computed_vertices,
1560            cycle_errors,
1561            elapsed: start.elapsed(),
1562        })
1563    }
1564
1565    /// Convenience: demand-driven evaluation of a single cell by sheet name and row/col.
1566    ///
1567    /// This will evaluate only the minimal set of dirty / volatile precedents required
1568    /// to bring the target cell up-to-date (as if a user asked for that single value),
1569    /// rather than scheduling a full workbook recalc. If the cell is already clean and
1570    /// non-volatile, no vertices will be recomputed.
1571    ///
1572    /// Returns the (possibly newly computed) value stored for the cell afterwards.
1573    /// Empty cells return None. Errors are surfaced via the Result type.
1574    pub fn evaluate_cell(
1575        &mut self,
1576        sheet: &str,
1577        row: u32,
1578        col: u32,
1579    ) -> Result<Option<LiteralValue>, ExcelError> {
1580        if row == 0 || col == 0 {
1581            return Err(ExcelError::new(ExcelErrorKind::Ref)
1582                .with_message("Row and column must be >= 1".to_string()));
1583        }
1584
1585        if self.config.defer_graph_building {
1586            self.build_graph_for_sheets(std::iter::once(sheet))?;
1587        }
1588
1589        let result = self.evaluate_cells(&[(sheet, row, col)])?;
1590
1591        match result.len() {
1592            0 => Ok(None),
1593            1 => {
1594                let v = result.into_iter().next().unwrap();
1595                Ok(v)
1596            }
1597            _ => unreachable!("evaluate_cells returned unexpected length"),
1598        }
1599    }
1600
1601    /// Convenience: demand-driven evaluation of multiple cells; accepts a slice of
1602    /// (sheet, row, col) triples. The union of required dirty / volatile precedents
1603    /// is computed once and evaluated, which is typically faster than calling
1604    /// `evaluate_cell` repeatedly for a related set of targets.
1605    ///
1606    /// Returns the resulting values for each requested target in the same order.
1607    pub fn evaluate_cells(
1608        &mut self,
1609        targets: &[(&str, u32, u32)],
1610    ) -> Result<Vec<Option<LiteralValue>>, ExcelError> {
1611        if targets.is_empty() {
1612            return Ok(Vec::new());
1613        }
1614        if self.config.defer_graph_building {
1615            let mut sheets: rustc_hash::FxHashSet<&str> = rustc_hash::FxHashSet::default();
1616            for (s, _, _) in targets.iter() {
1617                sheets.insert(*s);
1618            }
1619            self.build_graph_for_sheets(sheets.iter().cloned())?;
1620        }
1621        self.evaluate_until(targets)?;
1622        Ok(targets
1623            .iter()
1624            .map(|(s, r, c)| self.get_cell_value(s, *r, *c))
1625            .collect())
1626    }
1627
1628    /// Get the evaluation plan for target cells without actually evaluating them
1629    pub fn get_eval_plan(&self, targets: &[(&str, u32, u32)]) -> Result<EvalPlan, ExcelError> {
1630        if targets.is_empty() {
1631            return Ok(EvalPlan {
1632                total_vertices_to_evaluate: 0,
1633                layers: Vec::new(),
1634                cycles_detected: 0,
1635                dirty_count: 0,
1636                volatile_count: 0,
1637                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1638                estimated_parallel_layers: 0,
1639                target_cells: Vec::new(),
1640            });
1641        }
1642        if self.config.defer_graph_building {
1643            return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
1644                "Evaluation plan requested with deferred graph; build first or call evaluate_*",
1645            ));
1646        }
1647
1648        // Convert targets to A1 notation for consistency
1649        let addresses: Vec<String> = targets
1650            .iter()
1651            .map(|(s, r, c)| format!("{}!{}{}", s, Self::col_to_letters(*c), r))
1652            .collect();
1653
1654        // Parse target cell addresses
1655        let mut target_addrs = Vec::new();
1656        for (sheet, row, col) in targets {
1657            if let Some(sheet_id) = self.graph.sheet_id(sheet) {
1658                let coord = Coord::from_excel(*row, *col, true, true);
1659                target_addrs.push(CellRef::new(sheet_id, coord));
1660            }
1661        }
1662
1663        // Find vertex IDs for targets
1664        let mut target_vertex_ids = Vec::new();
1665        for addr in &target_addrs {
1666            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
1667                target_vertex_ids.push(*vertex_id);
1668            }
1669        }
1670
1671        if target_vertex_ids.is_empty() {
1672            return Ok(EvalPlan {
1673                total_vertices_to_evaluate: 0,
1674                layers: Vec::new(),
1675                cycles_detected: 0,
1676                dirty_count: 0,
1677                volatile_count: 0,
1678                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1679                estimated_parallel_layers: 0,
1680                target_cells: addresses,
1681            });
1682        }
1683
1684        // Build demand subgraph with virtual edges (same as evaluate_until)
1685        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
1686
1687        if precedents_to_eval.is_empty() {
1688            return Ok(EvalPlan {
1689                total_vertices_to_evaluate: 0,
1690                layers: Vec::new(),
1691                cycles_detected: 0,
1692                dirty_count: 0,
1693                volatile_count: 0,
1694                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1695                estimated_parallel_layers: 0,
1696                target_cells: addresses,
1697            });
1698        }
1699
1700        // Count dirty and volatile vertices
1701        let mut dirty_count = 0;
1702        let mut volatile_count = 0;
1703        for &vertex_id in &precedents_to_eval {
1704            if self.graph.is_dirty(vertex_id) {
1705                dirty_count += 1;
1706            }
1707            if self.graph.is_volatile(vertex_id) {
1708                volatile_count += 1;
1709            }
1710        }
1711
1712        // Create schedule for the minimal subgraph honoring virtual edges
1713        let scheduler = Scheduler::new(&self.graph);
1714        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
1715
1716        // Build layer information
1717        let mut layers = Vec::new();
1718        let mut estimated_parallel_layers = 0;
1719        let parallel_enabled = self.config.enable_parallel && self.thread_pool.is_some();
1720
1721        for layer in &schedule.layers {
1722            let parallel_eligible = parallel_enabled && layer.vertices.len() > 1;
1723            if parallel_eligible {
1724                estimated_parallel_layers += 1;
1725            }
1726
1727            // Get sample cell addresses (up to 5)
1728            let sample_cells: Vec<String> = layer
1729                .vertices
1730                .iter()
1731                .take(5)
1732                .filter_map(|&vertex_id| {
1733                    self.graph
1734                        .get_cell_ref_for_vertex(vertex_id)
1735                        .map(|cell_ref| {
1736                            let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
1737                            format!(
1738                                "{}!{}{}",
1739                                sheet_name,
1740                                Self::col_to_letters(cell_ref.coord.col()),
1741                                cell_ref.coord.row() + 1
1742                            )
1743                        })
1744                })
1745                .collect();
1746
1747            layers.push(LayerInfo {
1748                vertex_count: layer.vertices.len(),
1749                parallel_eligible,
1750                sample_cells,
1751            });
1752        }
1753
1754        Ok(EvalPlan {
1755            total_vertices_to_evaluate: precedents_to_eval.len(),
1756            layers,
1757            cycles_detected: schedule.cycles.len(),
1758            dirty_count,
1759            volatile_count,
1760            parallel_enabled,
1761            estimated_parallel_layers,
1762            target_cells: addresses,
1763        })
1764    }
1765
1766    /// Build a demand-driven subgraph for the given targets, including ephemeral edges for
1767    /// compressed ranges, and returning the set of dirty/volatile precedents and virtual deps.
1768    fn build_demand_subgraph(
1769        &self,
1770        target_vertices: &[VertexId],
1771    ) -> (
1772        Vec<VertexId>,
1773        rustc_hash::FxHashMap<VertexId, Vec<VertexId>>,
1774    ) {
1775        #[cfg(feature = "tracing")]
1776        let _span =
1777            tracing::info_span!("demand_subgraph", targets = target_vertices.len()).entered();
1778        use rustc_hash::{FxHashMap, FxHashSet};
1779
1780        let mut to_evaluate: FxHashSet<VertexId> = FxHashSet::default();
1781        let mut visited: FxHashSet<VertexId> = FxHashSet::default();
1782        let mut stack: Vec<VertexId> = Vec::new();
1783        let mut vdeps: FxHashMap<VertexId, Vec<VertexId>> = FxHashMap::default(); // incoming deps per vertex
1784
1785        for &t in target_vertices {
1786            stack.push(t);
1787        }
1788
1789        while let Some(v) = stack.pop() {
1790            if !visited.insert(v) {
1791                continue;
1792            }
1793            if !self.graph.vertex_exists(v) {
1794                continue;
1795            }
1796            // Only schedule dirty/volatile formulas
1797            match self.graph.get_vertex_kind(v) {
1798                VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1799                    if self.graph.is_dirty(v) || self.graph.is_volatile(v) {
1800                        to_evaluate.insert(v);
1801                    }
1802                }
1803                _ => {}
1804            }
1805
1806            // Explicit dependencies (graph edges)
1807            for dep in self.graph.get_dependencies(v) {
1808                if self.graph.vertex_exists(dep) {
1809                    match self.graph.get_vertex_kind(dep) {
1810                        VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1811                            if !visited.contains(&dep) {
1812                                stack.push(dep);
1813                            }
1814                        }
1815                        _ => {}
1816                    }
1817                }
1818            }
1819
1820            // Compressed range dependencies → discover formula precedents in used/bounded window
1821            if let Some(ranges) = self.graph.get_range_dependencies(v) {
1822                let current_sheet_id = self.graph.get_vertex_sheet_id(v);
1823                for r in ranges {
1824                    let sheet_id = match r.sheet {
1825                        formualizer_common::SheetLocator::Id(id) => id,
1826                        _ => current_sheet_id,
1827                    };
1828                    let sheet_name = self.graph.sheet_name(sheet_id);
1829
1830                    // Infer bounds like resolve_range_view (r bounds are 0-based)
1831                    let mut sr = r.start_row.map(|b| b.index + 1);
1832                    let mut sc = r.start_col.map(|b| b.index + 1);
1833                    let mut er = r.end_row.map(|b| b.index + 1);
1834                    let mut ec = r.end_col.map(|b| b.index + 1);
1835
1836                    if sr.is_none() && er.is_none() {
1837                        let scv = sc.unwrap_or(1);
1838                        let ecv = ec.unwrap_or(scv);
1839                        if let Some((min_r, max_r)) =
1840                            self.used_rows_for_columns(sheet_name, scv, ecv)
1841                        {
1842                            sr = Some(min_r);
1843                            er = Some(max_r);
1844                        } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
1845                            sr = Some(1);
1846                            er = Some(max_rows);
1847                        }
1848                    }
1849                    if sc.is_none() && ec.is_none() {
1850                        let srv = sr.unwrap_or(1);
1851                        let erv = er.unwrap_or(srv);
1852                        if let Some((min_c, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv)
1853                        {
1854                            sc = Some(min_c);
1855                            ec = Some(max_c);
1856                        } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
1857                            sc = Some(1);
1858                            ec = Some(max_cols);
1859                        }
1860                    }
1861                    if sr.is_some() && er.is_none() {
1862                        let scv = sc.unwrap_or(1);
1863                        let ecv = ec.unwrap_or(scv);
1864                        if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
1865                            er = Some(max_r);
1866                        } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
1867                            er = Some(max_rows);
1868                        }
1869                    }
1870                    if er.is_some() && sr.is_none() {
1871                        let scv = sc.unwrap_or(1);
1872                        let ecv = ec.unwrap_or(scv);
1873                        if let Some((min_r, _)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
1874                            sr = Some(min_r);
1875                        } else {
1876                            sr = Some(1);
1877                        }
1878                    }
1879                    if sc.is_some() && ec.is_none() {
1880                        let srv = sr.unwrap_or(1);
1881                        let erv = er.unwrap_or(srv);
1882                        if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
1883                            ec = Some(max_c);
1884                        } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
1885                            ec = Some(max_cols);
1886                        }
1887                    }
1888                    if ec.is_some() && sc.is_none() {
1889                        let srv = sr.unwrap_or(1);
1890                        let erv = er.unwrap_or(srv);
1891                        if let Some((min_c, _)) = self.used_cols_for_rows(sheet_name, srv, erv) {
1892                            sc = Some(min_c);
1893                        } else {
1894                            sc = Some(1);
1895                        }
1896                    }
1897
1898                    let sr = sr.unwrap_or(1);
1899                    let sc = sc.unwrap_or(1);
1900                    let er = er.unwrap_or(sr.saturating_sub(1));
1901                    let ec = ec.unwrap_or(sc.saturating_sub(1));
1902                    if er < sr || ec < sc {
1903                        continue;
1904                    }
1905
1906                    if let Some(index) = self.graph.sheet_index(sheet_id) {
1907                        let sr0 = sr.saturating_sub(1);
1908                        let er0 = er.saturating_sub(1);
1909                        let sc0 = sc.saturating_sub(1);
1910                        let ec0 = ec.saturating_sub(1);
1911                        // enumerate vertices in col range, filter row and kind
1912                        for u in index.vertices_in_col_range(sc0, ec0) {
1913                            let pc = self.graph.vertex_coord(u);
1914                            let row0 = pc.row();
1915                            if row0 < sr0 || row0 > er0 {
1916                                continue;
1917                            }
1918                            match self.graph.get_vertex_kind(u) {
1919                                VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1920                                    // only link and schedule if producer is dirty/volatile
1921                                    if (self.graph.is_dirty(u) || self.graph.is_volatile(u))
1922                                        && u != v
1923                                    {
1924                                        vdeps.entry(v).or_default().push(u);
1925                                        if !visited.contains(&u) {
1926                                            stack.push(u);
1927                                        }
1928                                    }
1929                                }
1930                                _ => {}
1931                            }
1932                        }
1933                    }
1934                }
1935            }
1936        }
1937
1938        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
1939        result.sort_unstable();
1940        // Dedup virtual deps
1941        for deps in vdeps.values_mut() {
1942            deps.sort_unstable();
1943            deps.dedup();
1944        }
1945        (result, vdeps)
1946    }
1947
1948    /// Helper: convert 1-based column index to Excel-style letters (1 -> A, 27 -> AA)
1949    fn col_to_letters(col: u32) -> String {
1950        col_letters_from_1based(col).expect("column index must be >= 1")
1951    }
1952
1953    /// Evaluate all dirty/volatile vertices with cancellation support
1954    pub fn evaluate_all_cancellable(
1955        &mut self,
1956        cancel_flag: &AtomicBool,
1957    ) -> Result<EvalResult, ExcelError> {
1958        let start = web_time::Instant::now();
1959        let mut computed_vertices = 0;
1960        let mut cycle_errors = 0;
1961
1962        let to_evaluate = self.graph.get_evaluation_vertices();
1963        if to_evaluate.is_empty() {
1964            return Ok(EvalResult {
1965                computed_vertices,
1966                cycle_errors,
1967                elapsed: start.elapsed(),
1968            });
1969        }
1970
1971        let scheduler = Scheduler::new(&self.graph);
1972        let schedule = scheduler.create_schedule(&to_evaluate)?;
1973
1974        // Handle cycles first by marking them with #CIRC!
1975        for cycle in &schedule.cycles {
1976            // Check cancellation between cycles
1977            if cancel_flag.load(Ordering::Relaxed) {
1978                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1979                    .with_message("Evaluation cancelled during cycle handling".to_string()));
1980            }
1981
1982            cycle_errors += 1;
1983            let circ_error = LiteralValue::Error(
1984                ExcelError::new(ExcelErrorKind::Circ)
1985                    .with_message("Circular dependency detected".to_string()),
1986            );
1987            for &vertex_id in cycle {
1988                self.graph
1989                    .update_vertex_value(vertex_id, circ_error.clone());
1990            }
1991        }
1992
1993        // Evaluate acyclic layers sequentially with cancellation checks
1994        for layer in &schedule.layers {
1995            // Check cancellation between layers
1996            if cancel_flag.load(Ordering::Relaxed) {
1997                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1998                    .with_message("Evaluation cancelled between layers".to_string()));
1999            }
2000
2001            // Evaluate vertices in this layer (parallel or sequential)
2002            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2003                computed_vertices +=
2004                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
2005            } else {
2006                computed_vertices +=
2007                    self.evaluate_layer_sequential_cancellable(layer, cancel_flag)?;
2008            }
2009        }
2010
2011        // Clear dirty flags for all evaluated vertices (including cycles)
2012        self.graph.clear_dirty_flags(&to_evaluate);
2013
2014        // Re-dirty volatile vertices for the next evaluation cycle
2015        self.graph.redirty_volatiles();
2016
2017        Ok(EvalResult {
2018            computed_vertices,
2019            cycle_errors,
2020            elapsed: start.elapsed(),
2021        })
2022    }
2023
2024    /// Evaluate only the necessary precedents for specific target cells with cancellation support
2025    pub fn evaluate_until_cancellable(
2026        &mut self,
2027        targets: &[&str],
2028        cancel_flag: &AtomicBool,
2029    ) -> Result<EvalResult, ExcelError> {
2030        let start = web_time::Instant::now();
2031
2032        // Parse target cell addresses
2033        let mut target_addrs = Vec::new();
2034        for target in targets {
2035            let (sheet, row, col) = self.parse_a1_notation(target)?;
2036            let sheet_id = self.graph.sheet_id_mut(&sheet);
2037            let coord = Coord::from_excel(row, col, true, true);
2038            target_addrs.push(CellRef::new(sheet_id, coord));
2039        }
2040
2041        // Find vertex IDs for targets
2042        let mut target_vertex_ids = Vec::new();
2043        for addr in &target_addrs {
2044            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
2045                target_vertex_ids.push(*vertex_id);
2046            }
2047        }
2048
2049        if target_vertex_ids.is_empty() {
2050            return Ok(EvalResult {
2051                computed_vertices: 0,
2052                cycle_errors: 0,
2053                elapsed: start.elapsed(),
2054            });
2055        }
2056
2057        // Build demand subgraph with virtual edges
2058        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
2059
2060        if precedents_to_eval.is_empty() {
2061            return Ok(EvalResult {
2062                computed_vertices: 0,
2063                cycle_errors: 0,
2064                elapsed: start.elapsed(),
2065            });
2066        }
2067
2068        // Create schedule honoring virtual edges
2069        let scheduler = Scheduler::new(&self.graph);
2070        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
2071
2072        // Handle cycles first
2073        let mut cycle_errors = 0;
2074        for cycle in &schedule.cycles {
2075            // Check cancellation between cycles
2076            if cancel_flag.load(Ordering::Relaxed) {
2077                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
2078                    "Demand-driven evaluation cancelled during cycle handling".to_string(),
2079                ));
2080            }
2081
2082            cycle_errors += 1;
2083            let circ_error = LiteralValue::Error(
2084                ExcelError::new(ExcelErrorKind::Circ)
2085                    .with_message("Circular dependency detected".to_string()),
2086            );
2087            for &vertex_id in cycle {
2088                self.graph
2089                    .update_vertex_value(vertex_id, circ_error.clone());
2090            }
2091        }
2092
2093        // Evaluate layers with cancellation checks
2094        let mut computed_vertices = 0;
2095        for layer in &schedule.layers {
2096            // Check cancellation between layers
2097            if cancel_flag.load(Ordering::Relaxed) {
2098                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
2099                    "Demand-driven evaluation cancelled between layers".to_string(),
2100                ));
2101            }
2102
2103            // Evaluate vertices in this layer (parallel or sequential)
2104            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2105                computed_vertices +=
2106                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
2107            } else {
2108                computed_vertices +=
2109                    self.evaluate_layer_sequential_cancellable_demand_driven(layer, cancel_flag)?;
2110            }
2111        }
2112
2113        // Clear dirty flags for evaluated vertices
2114        self.graph.clear_dirty_flags(&precedents_to_eval);
2115
2116        // Re-dirty volatile vertices
2117        self.graph.redirty_volatiles();
2118
2119        Ok(EvalResult {
2120            computed_vertices,
2121            cycle_errors,
2122            elapsed: start.elapsed(),
2123        })
2124    }
2125
2126    fn parse_a1_notation(&self, address: &str) -> Result<(String, u32, u32), ExcelError> {
2127        let mut parts = address.splitn(2, '!');
2128        let first = parts.next().unwrap_or_default();
2129        let remainder = parts.next();
2130
2131        let (sheet, cell_part) = match remainder {
2132            Some(cell) => (first.to_string(), cell),
2133            None => (self.default_sheet_name().to_string(), first),
2134        };
2135
2136        let (row, col, _, _) = parse_a1_1based(cell_part).map_err(|err| {
2137            ExcelError::new(ExcelErrorKind::Ref)
2138                .with_message(format!("Invalid cell reference `{cell_part}`: {err}"))
2139        })?;
2140
2141        Ok((sheet, row, col))
2142    }
2143
2144    /// Determine volatility using this engine's FunctionProvider, falling back to global registry.
2145    fn is_ast_volatile_with_provider(&self, ast: &ASTNode) -> bool {
2146        use formualizer_parse::parser::ASTNodeType;
2147        match &ast.node_type {
2148            ASTNodeType::Function { name, args, .. } => {
2149                if let Some(func) = self
2150                    .get_function("", name)
2151                    .or_else(|| crate::function_registry::get("", name))
2152                    && func.caps().contains(crate::function::FnCaps::VOLATILE)
2153                {
2154                    return true;
2155                }
2156                args.iter()
2157                    .any(|arg| self.is_ast_volatile_with_provider(arg))
2158            }
2159            ASTNodeType::BinaryOp { left, right, .. } => {
2160                self.is_ast_volatile_with_provider(left)
2161                    || self.is_ast_volatile_with_provider(right)
2162            }
2163            ASTNodeType::UnaryOp { expr, .. } => self.is_ast_volatile_with_provider(expr),
2164            ASTNodeType::Array(rows) => rows.iter().any(|row| {
2165                row.iter()
2166                    .any(|cell| self.is_ast_volatile_with_provider(cell))
2167            }),
2168            _ => false,
2169        }
2170    }
2171
2172    /// Find dirty precedents that need evaluation for the given target vertices
2173    fn find_dirty_precedents(&self, target_vertices: &[VertexId]) -> Vec<VertexId> {
2174        use rustc_hash::FxHashSet;
2175
2176        let mut to_evaluate = FxHashSet::default();
2177        let mut visited = FxHashSet::default();
2178        let mut stack = Vec::new();
2179
2180        // Start reverse traversal from target vertices
2181        for &target in target_vertices {
2182            stack.push(target);
2183        }
2184
2185        while let Some(vertex_id) = stack.pop() {
2186            if !visited.insert(vertex_id) {
2187                continue; // Already processed
2188            }
2189
2190            if self.graph.vertex_exists(vertex_id) {
2191                // Check if this vertex needs evaluation
2192                let kind = self.graph.get_vertex_kind(vertex_id);
2193                let needs_eval = match kind {
2194                    super::vertex::VertexKind::FormulaScalar
2195                    | super::vertex::VertexKind::FormulaArray => {
2196                        self.graph.is_dirty(vertex_id) || self.graph.is_volatile(vertex_id)
2197                    }
2198                    _ => false, // Values and empty cells don't need evaluation
2199                };
2200
2201                if needs_eval {
2202                    to_evaluate.insert(vertex_id);
2203                }
2204
2205                // Continue traversal to dependencies (precedents)
2206                let dependencies = self.graph.get_dependencies(vertex_id);
2207                for &dep_id in &dependencies {
2208                    if !visited.contains(&dep_id) {
2209                        stack.push(dep_id);
2210                    }
2211                }
2212            }
2213        }
2214
2215        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
2216        result.sort_unstable();
2217        result
2218    }
2219
2220    /// Evaluate a layer sequentially
2221    fn evaluate_layer_sequential(
2222        &mut self,
2223        layer: &super::scheduler::Layer,
2224    ) -> Result<usize, ExcelError> {
2225        for &vertex_id in &layer.vertices {
2226            self.evaluate_vertex(vertex_id)?;
2227        }
2228        Ok(layer.vertices.len())
2229    }
2230
2231    /// Evaluate a layer sequentially with cancellation support
2232    fn evaluate_layer_sequential_cancellable(
2233        &mut self,
2234        layer: &super::scheduler::Layer,
2235        cancel_flag: &AtomicBool,
2236    ) -> Result<usize, ExcelError> {
2237        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
2238            // Check cancellation every 256 vertices to balance responsiveness with performance
2239            if i % 256 == 0 && cancel_flag.load(Ordering::Relaxed) {
2240                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
2241                    .with_message("Evaluation cancelled within layer".to_string()));
2242            }
2243
2244            self.evaluate_vertex(vertex_id)?;
2245        }
2246        Ok(layer.vertices.len())
2247    }
2248
2249    /// Evaluate a layer sequentially with more frequent cancellation checks for demand-driven evaluation
2250    fn evaluate_layer_sequential_cancellable_demand_driven(
2251        &mut self,
2252        layer: &super::scheduler::Layer,
2253        cancel_flag: &AtomicBool,
2254    ) -> Result<usize, ExcelError> {
2255        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
2256            // Check cancellation more frequently for demand-driven evaluation (every 128 vertices)
2257            if i % 128 == 0 && cancel_flag.load(Ordering::Relaxed) {
2258                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
2259                    .with_message("Demand-driven evaluation cancelled within layer".to_string()));
2260            }
2261
2262            self.evaluate_vertex(vertex_id)?;
2263        }
2264        Ok(layer.vertices.len())
2265    }
2266
2267    /// Evaluate a layer in parallel using the thread pool
2268    fn evaluate_layer_parallel(
2269        &mut self,
2270        layer: &super::scheduler::Layer,
2271    ) -> Result<usize, ExcelError> {
2272        use rayon::prelude::*;
2273
2274        let thread_pool = self.thread_pool.as_ref().unwrap();
2275
2276        // Collect all evaluation results first, then update the graph sequentially
2277        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
2278            thread_pool.install(|| {
2279                layer
2280                    .vertices
2281                    .par_iter()
2282                    .map(
2283                        |&vertex_id| match self.evaluate_vertex_immutable(vertex_id) {
2284                            Ok(v) => Ok((vertex_id, v)),
2285                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
2286                        },
2287                    )
2288                    .collect()
2289            });
2290
2291        // Update the graph with results sequentially (thread-safe)
2292        match results {
2293            Ok(vertex_results) => {
2294                for (vertex_id, result) in vertex_results {
2295                    self.graph.update_vertex_value(vertex_id, result.clone());
2296                    self.mirror_vertex_value_to_overlay(vertex_id, &result);
2297                }
2298                Ok(layer.vertices.len())
2299            }
2300            Err(e) => Err(e),
2301        }
2302    }
2303
2304    /// Evaluate a layer in parallel with cancellation support
2305    fn evaluate_layer_parallel_cancellable(
2306        &mut self,
2307        layer: &super::scheduler::Layer,
2308        cancel_flag: &AtomicBool,
2309    ) -> Result<usize, ExcelError> {
2310        use rayon::prelude::*;
2311
2312        let thread_pool = self.thread_pool.as_ref().unwrap();
2313
2314        // Check cancellation before starting parallel work
2315        if cancel_flag.load(Ordering::Relaxed) {
2316            return Err(ExcelError::new(ExcelErrorKind::Cancelled)
2317                .with_message("Parallel evaluation cancelled before starting".to_string()));
2318        }
2319
2320        // Collect all evaluation results first, then update the graph sequentially
2321        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
2322            thread_pool.install(|| {
2323                layer
2324                    .vertices
2325                    .par_iter()
2326                    .map(|&vertex_id| {
2327                        // Check cancellation periodically during parallel work
2328                        if cancel_flag.load(Ordering::Relaxed) {
2329                            return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
2330                                "Parallel evaluation cancelled during execution".to_string(),
2331                            ));
2332                        }
2333                        match self.evaluate_vertex_immutable(vertex_id) {
2334                            Ok(v) => Ok((vertex_id, v)),
2335                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
2336                        }
2337                    })
2338                    .collect()
2339            });
2340
2341        // Update the graph with results sequentially (thread-safe)
2342        match results {
2343            Ok(vertex_results) => {
2344                for (vertex_id, result) in vertex_results {
2345                    self.graph.update_vertex_value(vertex_id, result.clone());
2346                    self.mirror_vertex_value_to_overlay(vertex_id, &result);
2347                }
2348                Ok(layer.vertices.len())
2349            }
2350            Err(e) => Err(e),
2351        }
2352    }
2353
2354    /// Evaluate a single vertex without mutating the graph (for parallel evaluation)
2355    fn evaluate_vertex_immutable(&self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
2356        // Check if vertex exists
2357        if !self.graph.vertex_exists(vertex_id) {
2358            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
2359                .with_message(format!("Vertex not found: {vertex_id:?}")));
2360        }
2361
2362        // Get vertex kind and check if it needs evaluation
2363        let kind = self.graph.get_vertex_kind(vertex_id);
2364        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
2365
2366        let ast_id = match kind {
2367            VertexKind::FormulaScalar => {
2368                if let Some(ast_id) = self.graph.get_formula_id(vertex_id) {
2369                    ast_id
2370                } else {
2371                    return Ok(LiteralValue::Int(0));
2372                }
2373            }
2374            VertexKind::Empty | VertexKind::Cell => {
2375                // Check if there's a value stored
2376                if let Some(value) = self.graph.get_value(vertex_id) {
2377                    return Ok(value.clone());
2378                } else {
2379                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
2380                }
2381            }
2382            VertexKind::NamedScalar => {
2383                let named_range = self.graph.named_range_by_vertex(vertex_id).ok_or_else(|| {
2384                    ExcelError::new(ExcelErrorKind::Name)
2385                        .with_message("Named range metadata missing".to_string())
2386                })?;
2387
2388                return match &named_range.definition {
2389                    NamedDefinition::Cell(cell_ref) => {
2390                        if let Some(dep_vertex) = self.graph.get_vertex_for_cell(cell_ref)
2391                            && let Some(existing) = self.graph.get_value(dep_vertex)
2392                        {
2393                            return Ok(existing.clone());
2394                        }
2395                        let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
2396                        Ok(self
2397                            .graph
2398                            .get_cell_value(sheet_name, cell_ref.coord.row(), cell_ref.coord.col())
2399                            .unwrap_or(LiteralValue::Empty))
2400                    }
2401                    NamedDefinition::Formula { ast, .. } => {
2402                        let context_sheet = match named_range.scope {
2403                            NameScope::Sheet(id) => id,
2404                            NameScope::Workbook => sheet_id,
2405                        };
2406                        let sheet_name = self.graph.sheet_name(context_sheet);
2407                        let cell_ref = self
2408                            .graph
2409                            .get_cell_ref(vertex_id)
2410                            .unwrap_or_else(|| self.graph.make_cell_ref(sheet_name, 0, 0));
2411                        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
2412                        interpreter.evaluate_ast(ast)
2413                    }
2414                    NamedDefinition::Range(_) => Err(ExcelError::new(ExcelErrorKind::NImpl)
2415                        .with_message("Array named ranges not yet implemented".to_string())),
2416                };
2417            }
2418            _ => {
2419                return Ok(LiteralValue::Error(
2420                    ExcelError::new(formualizer_common::ExcelErrorKind::Na)
2421                        .with_message("Array formulas not yet supported".to_string()),
2422                ));
2423            }
2424        };
2425
2426        // The interpreter uses a reference to the engine as the context
2427        let sheet_name = self.graph.sheet_name(sheet_id);
2428        let cell_ref = self
2429            .graph
2430            .get_cell_ref(vertex_id)
2431            .expect("cell ref for vertex");
2432        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
2433
2434        interpreter.evaluate_arena_ast(ast_id, self.graph.data_store(), self.graph.sheet_reg())
2435    }
2436
2437    /// Get access to the shared thread pool for parallel evaluation
2438    pub fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
2439        self.thread_pool.as_ref()
2440    }
2441}
2442
2443#[derive(Default)]
2444struct RowBoundsCache {
2445    snapshot: u64,
2446    // key: (sheet_id, col_idx)
2447    map: rustc_hash::FxHashMap<(u32, usize), (Option<u32>, Option<u32>)>,
2448}
2449
2450impl RowBoundsCache {
2451    fn new(snapshot: u64) -> Self {
2452        Self {
2453            snapshot,
2454            map: Default::default(),
2455        }
2456    }
2457    fn get_row_bounds(
2458        &self,
2459        sheet_id: SheetId,
2460        col_idx: usize,
2461        snapshot: u64,
2462    ) -> Option<(Option<u32>, Option<u32>)> {
2463        if self.snapshot != snapshot {
2464            return None;
2465        }
2466        self.map.get(&(sheet_id as u32, col_idx)).copied()
2467    }
2468    fn put_row_bounds(
2469        &mut self,
2470        sheet_id: SheetId,
2471        col_idx: usize,
2472        snapshot: u64,
2473        bounds: (Option<u32>, Option<u32>),
2474    ) {
2475        if self.snapshot != snapshot {
2476            self.snapshot = snapshot;
2477            self.map.clear();
2478        }
2479        self.map.insert((sheet_id as u32, col_idx), bounds);
2480    }
2481}
2482
2483// Phase 2 shim: in-process spill manager delegating to current graph methods.
2484#[derive(Default)]
2485pub struct ShimSpillManager {
2486    region_locks: RegionLockManager,
2487    pub(crate) active_locks: rustc_hash::FxHashMap<VertexId, u64>,
2488}
2489
2490impl ShimSpillManager {
2491    pub(crate) fn reserve(
2492        &mut self,
2493        owner: VertexId,
2494        anchor_cell: CellRef,
2495        shape: SpillShape,
2496        _meta: SpillMeta,
2497    ) -> Result<(), ExcelError> {
2498        // Derive region from anchor + shape; enforce in-flight exclusivity only.
2499        let region = crate::engine::spill::Region {
2500            sheet_id: anchor_cell.sheet_id as u32,
2501            row_start: anchor_cell.coord.row(),
2502            row_end: anchor_cell
2503                .coord
2504                .row()
2505                .saturating_add(shape.rows)
2506                .saturating_sub(1),
2507            col_start: anchor_cell.coord.col(),
2508            col_end: anchor_cell
2509                .coord
2510                .col()
2511                .saturating_add(shape.cols)
2512                .saturating_sub(1),
2513        };
2514        match self.region_locks.reserve(region, owner) {
2515            Ok(id) => {
2516                if id != 0 {
2517                    self.active_locks.insert(owner, id);
2518                }
2519                Ok(())
2520            }
2521            Err(e) => Err(e),
2522        }
2523    }
2524
2525    pub(crate) fn commit_array(
2526        &mut self,
2527        graph: &mut DependencyGraph,
2528        anchor_vertex: VertexId,
2529        targets: &[CellRef],
2530        rows: Vec<Vec<LiteralValue>>,
2531    ) -> Result<(), ExcelError> {
2532        // Re-run plan on concrete targets before committing to respect blockers.
2533        let plan_res = graph.plan_spill_region(anchor_vertex, targets);
2534        if let Err(e) = plan_res {
2535            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2536                self.region_locks.release(id);
2537            }
2538            return Err(e);
2539        }
2540
2541        let commit_res = graph.commit_spill_region_atomic_with_fault(
2542            anchor_vertex,
2543            targets.to_vec(),
2544            rows,
2545            None,
2546        );
2547        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2548            self.region_locks.release(id);
2549        }
2550        commit_res.map(|_| ())
2551    }
2552
2553    /// Commit a spill and mirror all written cells into Arrow overlay via the owning engine.
2554    pub(crate) fn commit_array_with_overlay<R: EvaluationContext>(
2555        &mut self,
2556        engine: &mut Engine<R>,
2557        anchor_vertex: VertexId,
2558        targets: &[CellRef],
2559        rows: Vec<Vec<LiteralValue>>,
2560    ) -> Result<(), ExcelError> {
2561        // Re-run plan on concrete targets before committing to respect blockers.
2562        let plan_res = engine.graph.plan_spill_region(anchor_vertex, targets);
2563        if let Err(e) = plan_res {
2564            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2565                self.region_locks.release(id);
2566            }
2567            return Err(e);
2568        }
2569
2570        let commit_res = engine.graph.commit_spill_region_atomic_with_fault(
2571            anchor_vertex,
2572            targets.to_vec(),
2573            rows.clone(),
2574            None,
2575        );
2576        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2577            self.region_locks.release(id);
2578        }
2579        commit_res.map(|_| ())?;
2580
2581        // Mirror into Arrow overlay when enabled
2582        if engine.config.arrow_storage_enabled
2583            && engine.config.delta_overlay_enabled
2584            && engine.config.write_formula_overlay_enabled
2585        {
2586            // Expect targets to be a contiguous rectangle row-major starting at some anchor
2587            for (idx, cell) in targets.iter().enumerate() {
2588                let (r_off, c_off) = {
2589                    if rows.is_empty() || rows[0].is_empty() {
2590                        (0usize, 0usize)
2591                    } else {
2592                        let width = rows[0].len();
2593                        (idx / width, idx % width)
2594                    }
2595                };
2596                let v = rows
2597                    .get(r_off)
2598                    .and_then(|r| r.get(c_off))
2599                    .cloned()
2600                    .unwrap_or(LiteralValue::Empty);
2601                let sheet_name = engine.graph.sheet_name(cell.sheet_id).to_string();
2602                engine.mirror_value_to_overlay(
2603                    &sheet_name,
2604                    cell.coord.row() + 1,
2605                    cell.coord.col() + 1,
2606                    &v,
2607                );
2608            }
2609        }
2610        Ok(())
2611    }
2612}
2613
2614impl<R> Engine<R>
2615where
2616    R: EvaluationContext,
2617{
2618    fn resolve_shared_ref(
2619        &self,
2620        reference: &ReferenceType,
2621        current_sheet: &str,
2622    ) -> Result<formualizer_common::SheetRef<'static>, ExcelError> {
2623        use formualizer_common::{
2624            SheetCellRef as SharedCellRef, SheetLocator, SheetRangeRef as SharedRangeRef,
2625            SheetRef as SharedRef,
2626        };
2627
2628        // ReferenceType does not retain $ anchors. For evaluation and identity
2629        // we treat all coordinates as absolute to match graph cell addressing.
2630        // Anchor semantics are handled separately by rewrite/adjustment passes.
2631        let sr = match reference {
2632            ReferenceType::Cell { sheet, row, col } => {
2633                let row0 = row
2634                    .checked_sub(1)
2635                    .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
2636                let col0 = col
2637                    .checked_sub(1)
2638                    .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
2639                let sheet_loc = match sheet.as_deref() {
2640                    Some(name) => SheetLocator::from_name(name),
2641                    None => SheetLocator::Current,
2642                };
2643                let coord = formualizer_common::RelativeCoord::new(row0, col0, true, true);
2644                SharedRef::Cell(SharedCellRef::new(sheet_loc, coord))
2645            }
2646            ReferenceType::Range {
2647                sheet,
2648                start_row,
2649                start_col,
2650                end_row,
2651                end_col,
2652            } => {
2653                let sheet_loc = match sheet.as_deref() {
2654                    Some(name) => SheetLocator::from_name(name),
2655                    None => SheetLocator::Current,
2656                };
2657
2658                fn bound_from_1based(v: Option<u32>) -> Option<formualizer_common::AxisBound> {
2659                    v.and_then(|x| {
2660                        x.checked_sub(1)
2661                            .map(|i| formualizer_common::AxisBound::new(i, true))
2662                    })
2663                }
2664
2665                let sr = bound_from_1based(*start_row);
2666                if start_row.is_some() && sr.is_none() {
2667                    return Err(ExcelError::new(ExcelErrorKind::Ref));
2668                }
2669                let sc = bound_from_1based(*start_col);
2670                if start_col.is_some() && sc.is_none() {
2671                    return Err(ExcelError::new(ExcelErrorKind::Ref));
2672                }
2673                let er = bound_from_1based(*end_row);
2674                if end_row.is_some() && er.is_none() {
2675                    return Err(ExcelError::new(ExcelErrorKind::Ref));
2676                }
2677                let ec = bound_from_1based(*end_col);
2678                if end_col.is_some() && ec.is_none() {
2679                    return Err(ExcelError::new(ExcelErrorKind::Ref));
2680                }
2681
2682                let range = SharedRangeRef::from_parts(sheet_loc, sr, sc, er, ec)
2683                    .map_err(|_| ExcelError::new(ExcelErrorKind::Ref))?;
2684                SharedRef::Range(range)
2685            }
2686            _ => {
2687                return Err(ExcelError::new(ExcelErrorKind::Ref));
2688            }
2689        };
2690
2691        let current_id = self
2692            .graph
2693            .sheet_id(current_sheet)
2694            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
2695
2696        let resolve_loc = |loc: SheetLocator<'_>| -> Result<SheetLocator<'static>, ExcelError> {
2697            match loc {
2698                SheetLocator::Current => Ok(SheetLocator::Id(current_id)),
2699                SheetLocator::Id(id) => Ok(SheetLocator::Id(id)),
2700                SheetLocator::Name(name) => {
2701                    let n = name.as_ref();
2702                    self.graph
2703                        .sheet_id(n)
2704                        .map(SheetLocator::Id)
2705                        .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
2706                }
2707            }
2708        };
2709
2710        match sr {
2711            SharedRef::Cell(cell) => {
2712                let owned = cell.into_owned();
2713                let sheet = resolve_loc(owned.sheet)?;
2714                Ok(SharedRef::Cell(SharedCellRef::new(sheet, owned.coord)))
2715            }
2716            SharedRef::Range(range) => {
2717                let owned = range.into_owned();
2718                let sheet = resolve_loc(owned.sheet)?;
2719                Ok(SharedRef::Range(SharedRangeRef {
2720                    sheet,
2721                    start_row: owned.start_row,
2722                    start_col: owned.start_col,
2723                    end_row: owned.end_row,
2724                    end_col: owned.end_col,
2725                }))
2726            }
2727        }
2728    }
2729}
2730
2731// Implement the resolver traits for the Engine.
2732// This allows the interpreter to resolve references by querying the engine's graph.
2733impl<R> crate::traits::ReferenceResolver for Engine<R>
2734where
2735    R: EvaluationContext,
2736{
2737    fn resolve_cell_reference(
2738        &self,
2739        sheet: Option<&str>,
2740        row: u32,
2741        col: u32,
2742    ) -> Result<LiteralValue, ExcelError> {
2743        let sheet_name = sheet.unwrap_or_else(|| self.default_sheet_name()); // FIXME: should use formula current-sheet context
2744        // Prefer engine's unified accessor which consults Arrow store for base values
2745        // and falls back to graph for formulas and stored values.
2746        if let Some(v) = self.get_cell_value(sheet_name, row, col) {
2747            Ok(v)
2748        } else {
2749            // Excel semantics: empty cell coerces to 0 in numeric contexts
2750            Ok(LiteralValue::Int(0))
2751        }
2752    }
2753}
2754
2755impl<R> crate::traits::RangeResolver for Engine<R>
2756where
2757    R: EvaluationContext,
2758{
2759    fn resolve_range_reference(
2760        &self,
2761        sheet: Option<&str>,
2762        sr: Option<u32>,
2763        sc: Option<u32>,
2764        er: Option<u32>,
2765        ec: Option<u32>,
2766    ) -> Result<Box<dyn crate::traits::Range>, ExcelError> {
2767        // For now, delegate range resolution to the external resolver.
2768        // A future optimization could be to handle this within the graph.
2769        self.resolver.resolve_range_reference(sheet, sr, sc, er, ec)
2770    }
2771}
2772
2773impl<R> crate::traits::NamedRangeResolver for Engine<R>
2774where
2775    R: EvaluationContext,
2776{
2777    fn resolve_named_range_reference(
2778        &self,
2779        name: &str,
2780    ) -> Result<Vec<Vec<LiteralValue>>, ExcelError> {
2781        self.resolver.resolve_named_range_reference(name)
2782    }
2783}
2784
2785impl<R> crate::traits::TableResolver for Engine<R>
2786where
2787    R: EvaluationContext,
2788{
2789    fn resolve_table_reference(
2790        &self,
2791        tref: &formualizer_parse::parser::TableReference,
2792    ) -> Result<Box<dyn crate::traits::Table>, ExcelError> {
2793        self.resolver.resolve_table_reference(tref)
2794    }
2795}
2796
2797// The Engine is a Resolver because it implements the constituent traits.
2798impl<R> crate::traits::Resolver for Engine<R> where R: EvaluationContext {}
2799
2800// The Engine provides functions by delegating to its internal resolver.
2801impl<R> crate::traits::FunctionProvider for Engine<R>
2802where
2803    R: EvaluationContext,
2804{
2805    fn get_function(
2806        &self,
2807        prefix: &str,
2808        name: &str,
2809    ) -> Option<std::sync::Arc<dyn crate::function::Function>> {
2810        self.resolver.get_function(prefix, name)
2811    }
2812}
2813
2814// Override EvaluationContext to provide thread pool access
2815impl<R> crate::traits::EvaluationContext for Engine<R>
2816where
2817    R: EvaluationContext,
2818{
2819    fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
2820        self.thread_pool.as_ref()
2821    }
2822
2823    fn cancellation_token(&self) -> Option<&std::sync::atomic::AtomicBool> {
2824        // Engine-wide cancellation is exposed via evaluate_all_cancellable APIs; default None here.
2825        None
2826    }
2827
2828    fn chunk_hint(&self) -> Option<usize> {
2829        // Use a simple heuristic from configuration (stripe width * height) as a default hint.
2830        let hint =
2831            (self.config.stripe_height as usize).saturating_mul(self.config.stripe_width as usize);
2832        Some(hint.clamp(1024, 1 << 20)) // clamp between 1K and ~1M
2833    }
2834
2835    fn volatile_level(&self) -> crate::traits::VolatileLevel {
2836        self.config.volatile_level
2837    }
2838
2839    fn workbook_seed(&self) -> u64 {
2840        self.config.workbook_seed
2841    }
2842
2843    fn recalc_epoch(&self) -> u64 {
2844        self.recalc_epoch
2845    }
2846
2847    fn used_rows_for_columns(
2848        &self,
2849        sheet: &str,
2850        start_col: u32,
2851        end_col: u32,
2852    ) -> Option<(u32, u32)> {
2853        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
2854        let sheet_id = self.graph.sheet_id(sheet)?;
2855        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
2856        if arrow_ok && let Some(bounds) = self.arrow_used_row_bounds(sheet, start_col, end_col) {
2857            return Some(bounds);
2858        }
2859        let sc0 = start_col.saturating_sub(1);
2860        let ec0 = end_col.saturating_sub(1);
2861        self.graph
2862            .used_row_bounds_for_columns(sheet_id, sc0, ec0)
2863            .map(|(a0, b0)| (a0 + 1, b0 + 1))
2864    }
2865
2866    fn used_cols_for_rows(&self, sheet: &str, start_row: u32, end_row: u32) -> Option<(u32, u32)> {
2867        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
2868        let sheet_id = self.graph.sheet_id(sheet)?;
2869        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
2870        if arrow_ok && let Some(bounds) = self.arrow_used_col_bounds(sheet, start_row, end_row) {
2871            return Some(bounds);
2872        }
2873        let sr0 = start_row.saturating_sub(1);
2874        let er0 = end_row.saturating_sub(1);
2875        self.graph
2876            .used_col_bounds_for_rows(sheet_id, sr0, er0)
2877            .map(|(a0, b0)| (a0 + 1, b0 + 1))
2878    }
2879
2880    fn sheet_bounds(&self, sheet: &str) -> Option<(u32, u32)> {
2881        let _ = self.graph.sheet_id(sheet)?;
2882        // Excel-like upper bounds; we expose something finite but large.
2883        // Backends may override with real bounds.
2884        Some((1_048_576, 16_384)) // 1048576 rows, 16384 cols (XFD)
2885    }
2886
2887    fn data_snapshot_id(&self) -> u64 {
2888        self.snapshot_id.load(std::sync::atomic::Ordering::Relaxed)
2889    }
2890
2891    fn backend_caps(&self) -> crate::traits::BackendCaps {
2892        crate::traits::BackendCaps {
2893            streaming: true,
2894            used_region: true,
2895            write: false,
2896            tables: false,
2897            async_stream: false,
2898        }
2899    }
2900
2901    // Flats removed
2902
2903    fn arrow_fastpath_enabled(&self) -> bool {
2904        self.config.arrow_fastpath_enabled
2905    }
2906
2907    fn date_system(&self) -> crate::engine::DateSystem {
2908        self.config.date_system
2909    }
2910    /// New: resolve a reference into a RangeView (Phase 2 API)
2911    fn resolve_range_view<'c>(
2912        &'c self,
2913        reference: &ReferenceType,
2914        current_sheet: &str,
2915    ) -> Result<RangeView<'c>, ExcelError> {
2916        match reference {
2917            ReferenceType::Range { .. } => {
2918                let shared = self.resolve_shared_ref(reference, current_sheet)?;
2919                let formualizer_common::SheetRef::Range(range) = shared else {
2920                    return Err(ExcelError::new(ExcelErrorKind::Ref));
2921                };
2922                let sheet_id = match range.sheet {
2923                    formualizer_common::SheetLocator::Id(id) => id,
2924                    _ => return Err(ExcelError::new(ExcelErrorKind::Ref)),
2925                };
2926                let sheet_name = self.graph.sheet_name(sheet_id);
2927
2928                let bounded_range = if range.start_row.is_some()
2929                    && range.start_col.is_some()
2930                    && range.end_row.is_some()
2931                    && range.end_col.is_some()
2932                {
2933                    Some(RangeRef::try_from_shared(range.as_ref())?)
2934                } else {
2935                    None
2936                };
2937
2938                let mut sr = bounded_range
2939                    .as_ref()
2940                    .map(|r| r.start.coord.row() + 1)
2941                    .or_else(|| range.start_row.map(|b| b.index + 1));
2942                let mut sc = bounded_range
2943                    .as_ref()
2944                    .map(|r| r.start.coord.col() + 1)
2945                    .or_else(|| range.start_col.map(|b| b.index + 1));
2946                let mut er = bounded_range
2947                    .as_ref()
2948                    .map(|r| r.end.coord.row() + 1)
2949                    .or_else(|| range.end_row.map(|b| b.index + 1));
2950                let mut ec = bounded_range
2951                    .as_ref()
2952                    .map(|r| r.end.coord.col() + 1)
2953                    .or_else(|| range.end_col.map(|b| b.index + 1));
2954
2955                if sr.is_none() && er.is_none() {
2956                    // Full-column reference: anchor at row 1
2957                    let scv = sc.unwrap_or(1);
2958                    let ecv = ec.unwrap_or(scv);
2959                    sr = Some(1);
2960                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2961                        er = Some(max_r);
2962                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2963                        er = Some(max_rows);
2964                    }
2965                }
2966                if sc.is_none() && ec.is_none() {
2967                    // Full-row reference: anchor at column 1
2968                    let srv = sr.unwrap_or(1);
2969                    let erv = er.unwrap_or(srv);
2970                    sc = Some(1);
2971                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2972                        ec = Some(max_c);
2973                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2974                        ec = Some(max_cols);
2975                    }
2976                }
2977                if sr.is_some() && er.is_none() {
2978                    let scv = sc.unwrap_or(1);
2979                    let ecv = ec.unwrap_or(scv);
2980                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2981                        er = Some(max_r);
2982                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2983                        er = Some(max_rows);
2984                    }
2985                }
2986                if er.is_some() && sr.is_none() {
2987                    // Open start: anchor at row 1
2988                    sr = Some(1);
2989                }
2990                if sc.is_some() && ec.is_none() {
2991                    let srv = sr.unwrap_or(1);
2992                    let erv = er.unwrap_or(srv);
2993                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2994                        ec = Some(max_c);
2995                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2996                        ec = Some(max_cols);
2997                    }
2998                }
2999                if ec.is_some() && sc.is_none() {
3000                    // Open start: anchor at column 1
3001                    sc = Some(1);
3002                }
3003
3004                let sr = sr.unwrap_or(1);
3005                let sc = sc.unwrap_or(1);
3006                let er = er.unwrap_or(sr.saturating_sub(1));
3007                let ec = ec.unwrap_or(sc.saturating_sub(1));
3008
3009                // Feature: Arrow-only RangeView resolution (no Hybrid/GraphSlice) when overlay mirroring is enabled.
3010                if self.config.arrow_storage_enabled
3011                    && self.config.delta_overlay_enabled
3012                    && self.config.write_formula_overlay_enabled
3013                {
3014                    if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
3015                        let sr0 = sr.saturating_sub(1) as usize;
3016                        let sc0 = sc.saturating_sub(1) as usize;
3017                        let er0 = er.saturating_sub(1) as usize;
3018                        let ec0 = ec.saturating_sub(1) as usize;
3019                        let av = asheet.range_view(sr0, sc0, er0, ec0);
3020                        return Ok(RangeView::from_arrow(av));
3021                    }
3022                    // Materialize via graph when Arrow backend is absent.
3023                    let rows = (sr..=er)
3024                        .map(|r| {
3025                            (sc..=ec)
3026                                .map(|c| {
3027                                    let coord = Coord::from_excel(r, c, true, true);
3028                                    let addr = CellRef::new(sheet_id, coord);
3029                                    self.graph
3030                                        .get_vertex_id_for_address(&addr)
3031                                        .and_then(|id| self.graph.get_value(*id))
3032                                        .unwrap_or(LiteralValue::Empty)
3033                                })
3034                                .collect::<Vec<_>>()
3035                        })
3036                        .collect::<Vec<_>>();
3037                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(rows))));
3038                }
3039
3040                // Default behavior (no feature): prefer Hybrid when Arrow exists, otherwise GraphSlice
3041                if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
3042                    let sr0 = sr.saturating_sub(1) as usize;
3043                    let sc0 = sc.saturating_sub(1) as usize;
3044                    let er0 = er.saturating_sub(1) as usize;
3045                    let ec0 = ec.saturating_sub(1) as usize;
3046                    let av = asheet.range_view(sr0, sc0, er0, ec0);
3047                    return Ok(RangeView::from_hybrid(
3048                        &self.graph,
3049                        sheet_id,
3050                        sr.saturating_sub(1),
3051                        sc.saturating_sub(1),
3052                        av,
3053                    ));
3054                }
3055                Ok(RangeView::from_graph(
3056                    &self.graph,
3057                    sheet_id,
3058                    sr.saturating_sub(1),
3059                    sc.saturating_sub(1),
3060                    er.saturating_sub(1),
3061                    ec.saturating_sub(1),
3062                ))
3063            }
3064            ReferenceType::Cell { .. } => {
3065                let shared = self.resolve_shared_ref(reference, current_sheet)?;
3066                let formualizer_common::SheetRef::Cell(cell) = shared else {
3067                    return Err(ExcelError::new(ExcelErrorKind::Ref));
3068                };
3069                let addr = CellRef::try_from_shared(cell)?;
3070                let sheet_id = addr.sheet_id;
3071                let sheet_name = self.graph.sheet_name(sheet_id);
3072                let row = addr.coord.row() + 1;
3073                let col = addr.coord.col() + 1;
3074                if let Some(vid) = self.graph.get_vertex_id_for_address(&addr)
3075                    && matches!(
3076                        self.graph.get_vertex_kind(*vid),
3077                        VertexKind::FormulaScalar | VertexKind::FormulaArray
3078                    )
3079                    && let Some(v) = self.graph.get_cell_value(sheet_name, row, col)
3080                {
3081                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
3082                }
3083
3084                if self.config.arrow_storage_enabled
3085                    && self.config.delta_overlay_enabled
3086                    && self.config.write_formula_overlay_enabled
3087                {
3088                    if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
3089                        let r0 = row.saturating_sub(1) as usize;
3090                        let c0 = col.saturating_sub(1) as usize;
3091                        let av = asheet.range_view(r0, c0, r0, c0);
3092                        let v = av.get_cell(0, 0);
3093                        return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
3094                    }
3095                    let v = self
3096                        .graph
3097                        .get_cell_value(sheet_name, row, col)
3098                        .unwrap_or(LiteralValue::Empty);
3099                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
3100                }
3101
3102                if let Some(v) = self.graph.get_cell_value(sheet_name, row, col) {
3103                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
3104                }
3105                if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
3106                    let r0 = row.saturating_sub(1) as usize;
3107                    let c0 = col.saturating_sub(1) as usize;
3108                    let av = asheet.range_view(r0, c0, r0, c0);
3109                    let v = av.get_cell(0, 0);
3110                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
3111                }
3112                Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![
3113                    LiteralValue::Empty,
3114                ]]))))
3115            }
3116            ReferenceType::NamedRange(name) => {
3117                if let Some(current_id) = self.graph.sheet_id(current_sheet)
3118                    && let Some(named) = self.graph.resolve_name_entry(name, current_id)
3119                {
3120                    match &named.definition {
3121                        NamedDefinition::Cell(cell_ref) => {
3122                            let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
3123                            let value = self
3124                                .graph
3125                                .get_cell_value(
3126                                    sheet_name,
3127                                    cell_ref.coord.row(),
3128                                    cell_ref.coord.col(),
3129                                )
3130                                .unwrap_or(LiteralValue::Empty);
3131                            let data = vec![vec![value]];
3132                            return Ok(RangeView::from_borrowed(Box::leak(Box::new(data))));
3133                        }
3134                        NamedDefinition::Range(range_ref) => {
3135                            let sheet_name = self.graph.sheet_name(range_ref.start.sheet_id);
3136                            let mut rows = Vec::new();
3137                            for row in range_ref.start.coord.row()..=range_ref.end.coord.row() {
3138                                let mut line = Vec::new();
3139                                for col in range_ref.start.coord.col()..=range_ref.end.coord.col() {
3140                                    let value = self
3141                                        .graph
3142                                        .get_cell_value(sheet_name, row, col)
3143                                        .unwrap_or(LiteralValue::Empty);
3144                                    line.push(value);
3145                                }
3146                                rows.push(line);
3147                            }
3148                            return Ok(RangeView::from_borrowed(Box::leak(Box::new(rows))));
3149                        }
3150                        NamedDefinition::Formula { .. } => {
3151                            if let Some(value) = self.graph.get_value(named.vertex) {
3152                                let data = vec![vec![value]];
3153                                return Ok(RangeView::from_borrowed(Box::leak(Box::new(data))));
3154                            }
3155                        }
3156                    }
3157                }
3158                let data = self.resolver.resolve_named_range_reference(name)?;
3159                Ok(RangeView::from_borrowed(Box::leak(Box::new(data))))
3160            }
3161            ReferenceType::Table(tref) => {
3162                // Materialize via Resolver::resolve_range_like tranche 1
3163                let boxed = self.resolve_range_like(&ReferenceType::Table(tref.clone()))?;
3164                {
3165                    let owned = boxed.materialise().into_owned();
3166                    Ok(RangeView::from_borrowed(Box::leak(Box::new(owned))))
3167                }
3168            }
3169        }
3170    }
3171
3172    fn build_criteria_mask(
3173        &self,
3174        view: &crate::arrow_store::ArrowRangeView<'_>,
3175        col_in_view: usize,
3176        pred: &crate::args::CriteriaPredicate,
3177    ) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
3178        if view.dims().0 == 0 || view.dims().1 == 0 {
3179            return None;
3180        }
3181        compute_criteria_mask(view, col_in_view, pred)
3182    }
3183}
3184
3185impl<R> Engine<R>
3186where
3187    R: EvaluationContext,
3188{
3189    /// Helper: commit spill via shim and mirror resulting cells into Arrow overlay when enabled.
3190    fn commit_spill_and_mirror(
3191        &mut self,
3192        anchor_vertex: VertexId,
3193        targets: &[CellRef],
3194        rows: Vec<Vec<LiteralValue>>,
3195    ) -> Result<(), ExcelError> {
3196        // Commit via shim (releases locks)
3197        self.spill_mgr
3198            .commit_array(&mut self.graph, anchor_vertex, targets, rows.clone())?;
3199
3200        if self.config.arrow_storage_enabled
3201            && self.config.delta_overlay_enabled
3202            && self.config.write_formula_overlay_enabled
3203        {
3204            for (idx, cell) in targets.iter().enumerate() {
3205                if rows.is_empty() || rows[0].is_empty() {
3206                    break;
3207                }
3208                let width = rows[0].len();
3209                let r_off = idx / width;
3210                let c_off = idx % width;
3211                let v = rows[r_off][c_off].clone();
3212                let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
3213                self.mirror_value_to_overlay(
3214                    &sheet_name,
3215                    cell.coord.row() + 1,
3216                    cell.coord.col() + 1,
3217                    &v,
3218                );
3219            }
3220        }
3221        Ok(())
3222    }
3223}