Skip to main content

formualizer_eval/engine/
eval.rs

1use crate::SheetId;
2use crate::arrow_store::SheetStore;
3use crate::engine::eval_delta::{DeltaCollector, DeltaMode, EvalDelta};
4use crate::engine::named_range::{NameScope, NamedDefinition};
5use crate::engine::range_view::RangeView;
6use crate::engine::spill::{RegionLockManager, SpillMeta, SpillShape};
7use crate::engine::{DependencyGraph, EvalConfig, Scheduler, VertexId, VertexKind};
8use crate::interpreter::Interpreter;
9use crate::reference::{CellRef, Coord, RangeRef};
10use crate::traits::FunctionProvider;
11use crate::traits::{EvaluationContext, Resolver};
12use chrono::Timelike;
13use formualizer_common::{col_letters_from_1based, parse_a1_1based};
14use formualizer_parse::parser::ReferenceType;
15use formualizer_parse::{ASTNode, ExcelError, ExcelErrorKind, LiteralValue};
16use rayon::ThreadPoolBuilder;
17use rustc_hash::{FxHashMap, FxHashSet};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, Ordering};
20
21pub struct Engine<R> {
22    pub(crate) graph: DependencyGraph,
23    resolver: R,
24    pub config: EvalConfig,
25    thread_pool: Option<Arc<rayon::ThreadPool>>,
26    pub recalc_epoch: u64,
27    snapshot_id: std::sync::atomic::AtomicU64,
28    spill_mgr: ShimSpillManager,
29    /// Arrow-backed storage for sheet values (Phase A)
30    arrow_sheets: SheetStore,
31    /// True if any edit after bulk load; disables Arrow reads for parity
32    has_edited: bool,
33    /// Overlay compaction counter (Phase C instrumentation)
34    overlay_compactions: u64,
35    // Pass-scoped cache for Arrow used-row bounds per column
36    row_bounds_cache: std::sync::RwLock<Option<RowBoundsCache>>,
37    source_cache: Arc<std::sync::RwLock<SourceCache>>,
38    /// Staged formulas by sheet when `defer_graph_building` is enabled.
39    staged_formulas: std::collections::HashMap<String, Vec<(u32, u32, String)>>,
40    /// Transient cancellation flag used during evaluation
41    active_cancel_flag: Option<Arc<AtomicBool>>,
42}
43
44#[derive(Default)]
45struct SourceCache {
46    scalars: FxHashMap<(String, Option<u64>), LiteralValue>,
47    tables: FxHashMap<(String, Option<u64>), Arc<dyn crate::traits::Table>>,
48}
49
50struct SourceCacheSession {
51    cache: Arc<std::sync::RwLock<SourceCache>>,
52}
53
54impl Drop for SourceCacheSession {
55    fn drop(&mut self) {
56        if let Ok(mut g) = self.cache.write() {
57            *g = SourceCache::default();
58        }
59    }
60}
61
62#[derive(Debug)]
63pub struct EvalResult {
64    pub computed_vertices: usize,
65    pub cycle_errors: usize,
66    pub elapsed: std::time::Duration,
67}
68
69/// Cached evaluation schedule that can be replayed across multiple recalculations.
70#[derive(Debug)]
71pub struct RecalcPlan {
72    schedule: crate::engine::Schedule,
73}
74
75impl RecalcPlan {
76    pub fn layer_count(&self) -> usize {
77        self.schedule.layers.len()
78    }
79}
80
81#[cfg(test)]
82pub(crate) mod criteria_mask_test_hooks {
83    use std::cell::Cell;
84
85    thread_local! {
86        static TEXT_SEGMENTS_TOTAL: Cell<usize> = const { Cell::new(0) };
87        static TEXT_SEGMENTS_ALL_NULL: Cell<usize> = const { Cell::new(0) };
88    }
89
90    pub fn reset_text_segment_counters() {
91        TEXT_SEGMENTS_TOTAL.with(|c| c.set(0));
92        TEXT_SEGMENTS_ALL_NULL.with(|c| c.set(0));
93    }
94
95    pub fn text_segment_counters() -> (usize, usize) {
96        let a = TEXT_SEGMENTS_TOTAL.with(|c| c.get());
97        let b = TEXT_SEGMENTS_ALL_NULL.with(|c| c.get());
98        (a, b)
99    }
100
101    pub(crate) fn inc_total() {
102        TEXT_SEGMENTS_TOTAL.with(|c| c.set(c.get() + 1));
103    }
104    pub(crate) fn inc_all_null() {
105        TEXT_SEGMENTS_ALL_NULL.with(|c| c.set(c.get() + 1));
106    }
107}
108
109fn compute_criteria_mask(
110    view: &RangeView<'_>,
111    col_in_view: usize,
112    pred: &crate::args::CriteriaPredicate,
113) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
114    use crate::compute_prelude::{boolean, cmp, concat_arrays};
115    use arrow::compute::kernels::comparison::{ilike, nilike};
116    use arrow_array::{
117        Array as _, ArrayRef, BooleanArray, Float64Array, StringArray, builder::BooleanBuilder,
118    };
119
120    // Helper: apply a numeric predicate to a single Float64Array chunk
121    fn apply_numeric_pred(
122        chunk: &Float64Array,
123        pred: &crate::args::CriteriaPredicate,
124    ) -> Option<BooleanArray> {
125        match pred {
126            crate::args::CriteriaPredicate::Gt(n) => {
127                cmp::gt(chunk, &Float64Array::new_scalar(*n)).ok()
128            }
129            crate::args::CriteriaPredicate::Ge(n) => {
130                cmp::gt_eq(chunk, &Float64Array::new_scalar(*n)).ok()
131            }
132            crate::args::CriteriaPredicate::Lt(n) => {
133                cmp::lt(chunk, &Float64Array::new_scalar(*n)).ok()
134            }
135            crate::args::CriteriaPredicate::Le(n) => {
136                cmp::lt_eq(chunk, &Float64Array::new_scalar(*n)).ok()
137            }
138            crate::args::CriteriaPredicate::Eq(v) => match v {
139                formualizer_common::LiteralValue::Number(x) => {
140                    cmp::eq(chunk, &Float64Array::new_scalar(*x)).ok()
141                }
142                formualizer_common::LiteralValue::Int(i) => {
143                    cmp::eq(chunk, &Float64Array::new_scalar(*i as f64)).ok()
144                }
145                _ => None,
146            },
147            crate::args::CriteriaPredicate::Ne(v) => match v {
148                formualizer_common::LiteralValue::Number(x) => {
149                    cmp::neq(chunk, &Float64Array::new_scalar(*x)).ok()
150                }
151                formualizer_common::LiteralValue::Int(i) => {
152                    cmp::neq(chunk, &Float64Array::new_scalar(*i as f64)).ok()
153                }
154                _ => None,
155            },
156            _ => None,
157        }
158    }
159
160    // Check if this is a numeric predicate that can be applied per-chunk
161    let is_numeric_pred = matches!(
162        pred,
163        crate::args::CriteriaPredicate::Gt(_)
164            | crate::args::CriteriaPredicate::Ge(_)
165            | crate::args::CriteriaPredicate::Lt(_)
166            | crate::args::CriteriaPredicate::Le(_)
167            | crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Number(_))
168            | crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Int(_))
169            | crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Number(_))
170            | crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Int(_))
171    );
172
173    // OPTIMIZED PATH: For numeric predicates, apply per-chunk and concatenate boolean masks.
174    // This avoids materializing the full numeric column (64-bit per element) and instead
175    // concatenates boolean masks (1-bit per element) - a 64x memory reduction.
176    if is_numeric_pred {
177        let mut bool_parts: Vec<BooleanArray> = Vec::new();
178        for res in view.numbers_slices() {
179            let (_rs, _rl, cols_seg) = res.ok()?;
180            if col_in_view < cols_seg.len() {
181                let chunk = cols_seg[col_in_view].as_ref();
182                let mask = apply_numeric_pred(chunk, pred)?;
183                bool_parts.push(mask);
184            }
185        }
186
187        if bool_parts.is_empty() {
188            return None;
189        } else if bool_parts.len() == 1 {
190            return Some(std::sync::Arc::new(bool_parts.remove(0)));
191        } else {
192            // Concatenate boolean masks (much cheaper than concatenating Float64 arrays)
193            let anys: Vec<&dyn arrow_array::Array> = bool_parts
194                .iter()
195                .map(|a| a as &dyn arrow_array::Array)
196                .collect();
197            let conc: ArrayRef = concat_arrays(&anys).ok()?;
198            let ba = conc.as_any().downcast_ref::<BooleanArray>()?.clone();
199            return Some(std::sync::Arc::new(ba));
200        }
201    }
202
203    // TEXT PATH: build masks per row-chunk using lowered text slices.
204    // This avoids concatenating full-string columns just to compute a boolean mask.
205    let (text_kind, text_pat, empty_special) = match pred {
206        crate::args::CriteriaPredicate::Eq(formualizer_common::LiteralValue::Text(t)) => {
207            (0u8, t.to_ascii_lowercase(), t.is_empty())
208        }
209        crate::args::CriteriaPredicate::Ne(formualizer_common::LiteralValue::Text(t)) => {
210            (1u8, t.to_ascii_lowercase(), false)
211        }
212        crate::args::CriteriaPredicate::TextLike {
213            pattern,
214            case_insensitive,
215        } => {
216            let p = if *case_insensitive {
217                pattern.to_ascii_lowercase()
218            } else {
219                pattern.clone()
220            };
221            (2u8, p.replace('*', "%").replace('?', "_"), false)
222        }
223        _ => return None,
224    };
225
226    let pat = StringArray::new_scalar(text_pat);
227    let mut bool_parts: Vec<BooleanArray> = Vec::new();
228
229    for res in view.iter_row_chunks() {
230        let cs = res.ok()?;
231        if cs.row_len == 0 {
232            continue;
233        }
234        #[cfg(test)]
235        criteria_mask_test_hooks::inc_total();
236
237        let slices = view.slice_lowered_text(cs.row_start, cs.row_len);
238        if col_in_view >= slices.len() {
239            return None;
240        }
241
242        let seg_opt = slices[col_in_view].as_ref().map(|a| a.as_ref());
243        let seg = match seg_opt {
244            Some(s) => s,
245            None => {
246                #[cfg(test)]
247                criteria_mask_test_hooks::inc_all_null();
248                if text_kind == 0 && empty_special {
249                    // Eq("") treats nulls (Empty) as equal.
250                    let mut bb = BooleanBuilder::with_capacity(cs.row_len);
251                    bb.append_n(cs.row_len, true);
252                    bool_parts.push(bb.finish());
253                } else {
254                    // For non-empty patterns, ilike/nilike return null on null inputs.
255                    bool_parts.push(BooleanArray::new_null(cs.row_len));
256                }
257                continue;
258            }
259        };
260
261        let seg_sa = seg.as_any().downcast_ref::<StringArray>()?;
262        let mut m = match text_kind {
263            0 => ilike(seg_sa, &pat).ok()?,
264            1 => nilike(seg_sa, &pat).ok()?,
265            2 => ilike(seg_sa, &pat).ok()?,
266            _ => return None,
267        };
268
269        if text_kind == 0 && empty_special {
270            // Treat nulls as equal to empty string
271            let mut bb = BooleanBuilder::with_capacity(seg_sa.len());
272            for i in 0..seg_sa.len() {
273                bb.append_value(seg_sa.is_null(i));
274            }
275            let nulls = bb.finish();
276            m = boolean::or_kleene(&m, &nulls).ok()?;
277        }
278
279        bool_parts.push(m);
280    }
281
282    if bool_parts.is_empty() {
283        None
284    } else if bool_parts.len() == 1 {
285        Some(std::sync::Arc::new(bool_parts.remove(0)))
286    } else {
287        let anys: Vec<&dyn arrow_array::Array> = bool_parts
288            .iter()
289            .map(|a| a as &dyn arrow_array::Array)
290            .collect();
291        let conc: ArrayRef = concat_arrays(&anys).ok()?;
292        let ba = conc.as_any().downcast_ref::<BooleanArray>()?.clone();
293        Some(std::sync::Arc::new(ba))
294    }
295}
296
297#[derive(Debug, Clone)]
298pub struct LayerInfo {
299    pub vertex_count: usize,
300    pub parallel_eligible: bool,
301    pub sample_cells: Vec<String>, // Sample of up to 5 cell addresses
302}
303
304#[derive(Debug, Clone)]
305pub struct EvalPlan {
306    pub total_vertices_to_evaluate: usize,
307    pub layers: Vec<LayerInfo>,
308    pub cycles_detected: usize,
309    pub dirty_count: usize,
310    pub volatile_count: usize,
311    pub parallel_enabled: bool,
312    pub estimated_parallel_layers: usize,
313    pub target_cells: Vec<String>,
314}
315
316impl<R> Engine<R>
317where
318    R: EvaluationContext,
319{
320    pub fn new(resolver: R, config: EvalConfig) -> Self {
321        crate::builtins::load_builtins();
322
323        // Initialize thread pool based on config
324        let thread_pool = if config.enable_parallel {
325            let mut builder = ThreadPoolBuilder::new();
326            if let Some(max_threads) = config.max_threads {
327                builder = builder.num_threads(max_threads);
328            }
329
330            match builder.build() {
331                Ok(pool) => Some(Arc::new(pool)),
332                Err(_) => {
333                    // Fall back to sequential evaluation if thread pool creation fails
334                    None
335                }
336            }
337        } else {
338            None
339        };
340
341        let mut engine = Self {
342            graph: DependencyGraph::new_with_config(config.clone()),
343            resolver,
344            config,
345            thread_pool,
346            recalc_epoch: 0,
347            snapshot_id: std::sync::atomic::AtomicU64::new(1),
348            spill_mgr: ShimSpillManager::default(),
349            arrow_sheets: SheetStore::default(),
350            has_edited: false,
351            overlay_compactions: 0,
352            row_bounds_cache: std::sync::RwLock::new(None),
353            source_cache: Arc::new(std::sync::RwLock::new(SourceCache::default())),
354            staged_formulas: std::collections::HashMap::new(),
355            active_cancel_flag: None,
356        };
357        let default_sheet = engine.graph.default_sheet_name().to_string();
358        engine.ensure_arrow_sheet(&default_sheet);
359        engine
360    }
361
362    /// Create an Engine with a custom thread pool (for shared thread pool scenarios)
363    pub fn with_thread_pool(
364        resolver: R,
365        config: EvalConfig,
366        thread_pool: Arc<rayon::ThreadPool>,
367    ) -> Self {
368        crate::builtins::load_builtins();
369        let mut engine = Self {
370            graph: DependencyGraph::new_with_config(config.clone()),
371            resolver,
372            config,
373            thread_pool: Some(thread_pool),
374            recalc_epoch: 0,
375            snapshot_id: std::sync::atomic::AtomicU64::new(1),
376            spill_mgr: ShimSpillManager::default(),
377            arrow_sheets: SheetStore::default(),
378            has_edited: false,
379            overlay_compactions: 0,
380            row_bounds_cache: std::sync::RwLock::new(None),
381            source_cache: Arc::new(std::sync::RwLock::new(SourceCache::default())),
382            staged_formulas: std::collections::HashMap::new(),
383            active_cancel_flag: None,
384        };
385        let default_sheet = engine.graph.default_sheet_name().to_string();
386        engine.ensure_arrow_sheet(&default_sheet);
387        engine
388    }
389
390    fn clear_source_cache(&self) {
391        if let Ok(mut g) = self.source_cache.write() {
392            *g = SourceCache::default();
393        }
394    }
395
396    fn source_cache_session(&self) -> SourceCacheSession {
397        self.clear_source_cache();
398        SourceCacheSession {
399            cache: self.source_cache.clone(),
400        }
401    }
402
403    fn resolve_source_scalar_cached(
404        &self,
405        name: &str,
406        version: Option<u64>,
407    ) -> Result<LiteralValue, ExcelError> {
408        let key = (name.to_string(), version);
409        if let Ok(mut g) = self.source_cache.write() {
410            if let Some(v) = g.scalars.get(&key) {
411                return Ok(v.clone());
412            }
413
414            let v = self.resolver.resolve_source_scalar(name).map_err(|err| {
415                if matches!(err.kind, ExcelErrorKind::Name | ExcelErrorKind::NImpl) {
416                    ExcelError::new(ExcelErrorKind::Ref)
417                        .with_message(format!("Unresolved source scalar: {name}"))
418                } else {
419                    err
420                }
421            })?;
422            g.scalars.insert(key, v.clone());
423            Ok(v)
424        } else {
425            self.resolver.resolve_source_scalar(name).map_err(|err| {
426                if matches!(err.kind, ExcelErrorKind::Name | ExcelErrorKind::NImpl) {
427                    ExcelError::new(ExcelErrorKind::Ref)
428                        .with_message(format!("Unresolved source scalar: {name}"))
429                } else {
430                    err
431                }
432            })
433        }
434    }
435
436    fn resolve_source_table_cached(
437        &self,
438        name: &str,
439        version: Option<u64>,
440    ) -> Result<Arc<dyn crate::traits::Table>, ExcelError> {
441        let key = (name.to_string(), version);
442        if let Ok(mut g) = self.source_cache.write() {
443            if let Some(t) = g.tables.get(&key) {
444                return Ok(t.clone());
445            }
446
447            let t = self.resolver.resolve_source_table(name).map_err(|err| {
448                if matches!(err.kind, ExcelErrorKind::Name | ExcelErrorKind::NImpl) {
449                    ExcelError::new(ExcelErrorKind::Ref)
450                        .with_message(format!("Unresolved source table: {name}"))
451                } else {
452                    err
453                }
454            })?;
455            let t: Arc<dyn crate::traits::Table> = Arc::from(t);
456            g.tables.insert(key, t.clone());
457            Ok(t)
458        } else {
459            self.resolver
460                .resolve_source_table(name)
461                .map_err(|err| {
462                    if matches!(err.kind, ExcelErrorKind::Name | ExcelErrorKind::NImpl) {
463                        ExcelError::new(ExcelErrorKind::Ref)
464                            .with_message(format!("Unresolved source table: {name}"))
465                    } else {
466                        err
467                    }
468                })
469                .map(Arc::from)
470        }
471    }
472
473    fn source_table_to_range_view(
474        &self,
475        table: &dyn crate::traits::Table,
476        spec: &Option<formualizer_parse::parser::TableSpecifier>,
477    ) -> Result<RangeView<'static>, ExcelError> {
478        use formualizer_parse::parser::{SpecialItem, TableSpecifier};
479
480        let owned = match spec {
481            Some(TableSpecifier::Column(c)) => {
482                let c = c.trim();
483                if c == "@" || c.contains('[') || c.contains(']') || c.contains(',') {
484                    return Err(ExcelError::new(ExcelErrorKind::NImpl).with_message(
485                        "Complex structured references not yet supported".to_string(),
486                    ));
487                }
488                table.get_column(c)?.materialise().into_owned()
489            }
490            Some(TableSpecifier::ColumnRange(start, end)) => {
491                let cols = table.columns();
492                let start = start.trim();
493                let end = end.trim();
494                let start_idx = cols.iter().position(|n| n.eq_ignore_ascii_case(start));
495                let end_idx = cols.iter().position(|n| n.eq_ignore_ascii_case(end));
496                if let (Some(mut si), Some(mut ei)) = (start_idx, end_idx) {
497                    if si > ei {
498                        std::mem::swap(&mut si, &mut ei);
499                    }
500                    let h = table.data_height();
501                    let w = ei - si + 1;
502                    let mut rows = vec![vec![LiteralValue::Empty; w]; h];
503                    for (offset, ci) in (si..=ei).enumerate() {
504                        let cname = &cols[ci];
505                        let col_range = table.get_column(cname)?;
506                        let (rh, _) = col_range.dimensions();
507                        for (r, row) in rows.iter_mut().enumerate().take(h.min(rh)) {
508                            row[offset] = col_range.get(r, 0)?;
509                        }
510                    }
511                    rows
512                } else {
513                    return Err(ExcelError::new(ExcelErrorKind::Ref)
514                        .with_message("Column range refers to unknown column(s)".to_string()));
515                }
516            }
517            Some(TableSpecifier::SpecialItem(SpecialItem::Headers))
518            | Some(TableSpecifier::Headers) => table
519                .headers_row()
520                .map(|r| r.materialise().into_owned())
521                .unwrap_or_default(),
522            Some(TableSpecifier::SpecialItem(SpecialItem::Totals))
523            | Some(TableSpecifier::Totals) => table
524                .totals_row()
525                .map(|r| r.materialise().into_owned())
526                .unwrap_or_default(),
527            Some(TableSpecifier::SpecialItem(SpecialItem::Data)) | Some(TableSpecifier::Data) => {
528                table
529                    .data_body()
530                    .map(|r| r.materialise().into_owned())
531                    .unwrap_or_default()
532            }
533            Some(TableSpecifier::SpecialItem(SpecialItem::All)) | Some(TableSpecifier::All) => {
534                let mut out: Vec<Vec<LiteralValue>> = Vec::new();
535                if let Some(h) = table.headers_row() {
536                    out.extend(h.iter_rows());
537                }
538                if let Some(body) = table.data_body() {
539                    out.extend(body.iter_rows());
540                }
541                if let Some(tr) = table.totals_row() {
542                    out.extend(tr.iter_rows());
543                }
544                out
545            }
546            Some(TableSpecifier::SpecialItem(SpecialItem::ThisRow)) => {
547                return Err(ExcelError::new(ExcelErrorKind::NImpl).with_message(
548                    "@ (This Row) requires table-aware context; not yet supported".to_string(),
549                ));
550            }
551            Some(TableSpecifier::Row(_)) | Some(TableSpecifier::Combination(_)) => {
552                return Err(ExcelError::new(ExcelErrorKind::NImpl)
553                    .with_message("Complex structured references not yet supported".to_string()));
554            }
555            None => {
556                return Err(ExcelError::new(ExcelErrorKind::NImpl)
557                    .with_message("Table reference without specifier is unsupported".to_string()));
558            }
559        };
560
561        Ok(RangeView::from_owned_rows(owned, self.config.date_system))
562    }
563
564    pub fn default_sheet_id(&self) -> SheetId {
565        self.graph.default_sheet_id()
566    }
567
568    pub fn default_sheet_name(&self) -> &str {
569        self.graph.default_sheet_name()
570    }
571
572    /// Update the workbook seed for deterministic RNGs in functions.
573    pub fn set_workbook_seed(&mut self, seed: u64) {
574        self.config.workbook_seed = seed;
575    }
576
577    /// Set the volatile level policy (Always/OnRecalc/OnOpen)
578    pub fn set_volatile_level(&mut self, level: crate::traits::VolatileLevel) {
579        self.config.volatile_level = level;
580    }
581
582    pub fn sheet_id(&self, name: &str) -> Option<SheetId> {
583        self.graph.sheet_id(name)
584    }
585
586    pub fn sheet_id_mut(&mut self, name: &str) -> SheetId {
587        self.add_sheet(name)
588            .unwrap_or_else(|_| self.graph.sheet_id_mut(name))
589    }
590
591    pub fn sheet_name(&self, id: SheetId) -> &str {
592        self.graph.sheet_name(id)
593    }
594
595    pub fn add_sheet(&mut self, name: &str) -> Result<SheetId, ExcelError> {
596        let id = self.graph.add_sheet(name)?;
597        self.ensure_arrow_sheet(name);
598        Ok(id)
599    }
600
601    fn ensure_arrow_sheet(&mut self, name: &str) {
602        if self.arrow_sheets.sheet(name).is_some() {
603            return;
604        }
605        self.arrow_sheets
606            .sheets
607            .push(crate::arrow_store::ArrowSheet {
608                name: std::sync::Arc::<str>::from(name),
609                columns: Vec::new(),
610                nrows: 0,
611                chunk_starts: Vec::new(),
612                chunk_rows: 32 * 1024,
613            });
614    }
615
616    pub fn remove_sheet(&mut self, sheet_id: SheetId) -> Result<(), ExcelError> {
617        let name = self.graph.sheet_name(sheet_id).to_string();
618        self.graph.remove_sheet(sheet_id)?;
619        self.arrow_sheets.sheets.retain(|s| s.name.as_ref() != name);
620        Ok(())
621    }
622
623    pub fn rename_sheet(&mut self, sheet_id: SheetId, new_name: &str) -> Result<(), ExcelError> {
624        let old_name = self.graph.sheet_name(sheet_id).to_string();
625        self.graph.rename_sheet(sheet_id, new_name)?;
626        self.ensure_arrow_sheet(&old_name);
627        if let Some(asheet) = self.arrow_sheets.sheet_mut(&old_name) {
628            asheet.name = std::sync::Arc::<str>::from(new_name);
629        }
630        Ok(())
631    }
632
633    pub fn named_ranges_iter(
634        &self,
635    ) -> impl Iterator<Item = (&String, &crate::engine::named_range::NamedRange)> {
636        self.graph.named_ranges_iter()
637    }
638
639    pub fn sheet_named_ranges_iter(
640        &self,
641    ) -> impl Iterator<Item = (&(SheetId, String), &crate::engine::named_range::NamedRange)> {
642        self.graph.sheet_named_ranges_iter()
643    }
644
645    pub fn resolve_name_entry(
646        &self,
647        name: &str,
648        current_sheet: SheetId,
649    ) -> Option<&crate::engine::named_range::NamedRange> {
650        self.graph.resolve_name_entry(name, current_sheet)
651    }
652
653    pub fn define_name(
654        &mut self,
655        name: &str,
656        definition: NamedDefinition,
657        scope: NameScope,
658    ) -> Result<(), ExcelError> {
659        self.graph.define_name(name, definition, scope)
660    }
661
662    pub fn update_name(
663        &mut self,
664        name: &str,
665        definition: NamedDefinition,
666        scope: NameScope,
667    ) -> Result<(), ExcelError> {
668        self.graph.update_name(name, definition, scope)
669    }
670
671    pub fn delete_name(&mut self, name: &str, scope: NameScope) -> Result<(), ExcelError> {
672        self.graph.delete_name(name, scope)
673    }
674
675    pub fn define_table(
676        &mut self,
677        name: &str,
678        range: crate::reference::RangeRef,
679        headers: Vec<String>,
680        totals_row: bool,
681    ) -> Result<(), ExcelError> {
682        self.graph.define_table(name, range, headers, totals_row)
683    }
684
685    pub fn define_source_scalar(
686        &mut self,
687        name: &str,
688        version: Option<u64>,
689    ) -> Result<(), ExcelError> {
690        self.graph.define_source_scalar(name, version)
691    }
692
693    pub fn define_source_table(
694        &mut self,
695        name: &str,
696        version: Option<u64>,
697    ) -> Result<(), ExcelError> {
698        self.graph.define_source_table(name, version)
699    }
700
701    pub fn set_source_scalar_version(
702        &mut self,
703        name: &str,
704        version: Option<u64>,
705    ) -> Result<(), ExcelError> {
706        self.graph.set_source_scalar_version(name, version)
707    }
708
709    pub fn set_source_table_version(
710        &mut self,
711        name: &str,
712        version: Option<u64>,
713    ) -> Result<(), ExcelError> {
714        self.graph.set_source_table_version(name, version)
715    }
716
717    pub fn invalidate_source(&mut self, name: &str) -> Result<(), ExcelError> {
718        self.graph.invalidate_source(name)
719    }
720
721    pub fn vertex_value(&self, vertex: VertexId) -> Option<LiteralValue> {
722        self.graph.get_value(vertex)
723    }
724
725    pub fn graph_cell_value(&self, sheet: &str, row: u32, col: u32) -> Option<LiteralValue> {
726        self.graph.get_cell_value(sheet, row, col)
727    }
728
729    pub fn vertex_for_cell(&self, cell: &CellRef) -> Option<VertexId> {
730        self.graph.get_vertex_for_cell(cell)
731    }
732
733    pub fn evaluation_vertices(&self) -> Vec<VertexId> {
734        self.graph.get_evaluation_vertices()
735    }
736
737    pub fn set_first_load_assume_new(&mut self, enabled: bool) {
738        self.graph.set_first_load_assume_new(enabled);
739    }
740
741    pub fn reset_ensure_touched(&mut self) {
742        self.graph.reset_ensure_touched();
743    }
744
745    pub fn finalize_sheet_index(&mut self, sheet: &str) {
746        self.graph.finalize_sheet_index(sheet);
747    }
748
749    pub fn edit_with_logger<T>(
750        &mut self,
751        log: &mut crate::engine::ChangeLog,
752        f: impl FnOnce(&mut crate::engine::VertexEditor) -> T,
753    ) -> T {
754        let mut editor = crate::engine::VertexEditor::with_logger(&mut self.graph, log);
755        f(&mut editor)
756    }
757
758    pub fn undo_logged(
759        &mut self,
760        undo: &mut crate::engine::graph::editor::undo_engine::UndoEngine,
761        log: &mut crate::engine::ChangeLog,
762    ) -> Result<(), crate::engine::EditorError> {
763        undo.undo(&mut self.graph, log)
764    }
765
766    pub fn redo_logged(
767        &mut self,
768        undo: &mut crate::engine::graph::editor::undo_engine::UndoEngine,
769        log: &mut crate::engine::ChangeLog,
770    ) -> Result<(), crate::engine::EditorError> {
771        undo.redo(&mut self.graph, log)
772    }
773
774    pub fn set_default_sheet_by_name(&mut self, name: &str) {
775        self.graph.set_default_sheet_by_name(name);
776    }
777
778    pub fn set_default_sheet_by_id(&mut self, id: SheetId) {
779        self.graph.set_default_sheet_by_id(id);
780    }
781
782    pub fn set_sheet_index_mode(&mut self, mode: crate::engine::SheetIndexMode) {
783        self.graph.set_sheet_index_mode(mode);
784    }
785
786    /// Mark data edited: bump snapshot and set edited flag
787    pub fn mark_data_edited(&mut self) {
788        self.snapshot_id
789            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
790        self.has_edited = true;
791    }
792
793    /// Access Arrow sheet store (read-only)
794    pub fn sheet_store(&self) -> &SheetStore {
795        &self.arrow_sheets
796    }
797
798    /// Access Arrow sheet store (mutable)
799    pub fn sheet_store_mut(&mut self) -> &mut SheetStore {
800        &mut self.arrow_sheets
801    }
802
803    /// Stage a formula text instead of inserting into the graph (used when deferring is enabled).
804    pub fn stage_formula_text(&mut self, sheet: &str, row: u32, col: u32, text: String) {
805        self.staged_formulas
806            .entry(sheet.to_string())
807            .or_default()
808            .push((row, col, text));
809    }
810
811    /// Get a staged formula text for a given cell if present (cloned).
812    pub fn get_staged_formula_text(&self, sheet: &str, row: u32, col: u32) -> Option<String> {
813        self.staged_formulas.get(sheet).and_then(|v| {
814            v.iter()
815                .find(|(r, c, _)| *r == row && *c == col)
816                .map(|(_, _, s)| s.clone())
817        })
818    }
819
820    /// Build graph for all staged formulas.
821    pub fn build_graph_all(&mut self) -> Result<(), formualizer_parse::ExcelError> {
822        if self.staged_formulas.is_empty() {
823            return Ok(());
824        }
825        // Take staged formulas before borrowing graph via builder
826        let staged = std::mem::take(&mut self.staged_formulas);
827        for sheet in staged.keys() {
828            let _ = self.add_sheet(sheet);
829        }
830        let mut builder = self.begin_bulk_ingest();
831        for (sheet, entries) in staged {
832            let sid = builder.add_sheet(&sheet);
833            // Parse with small cache for shared formulas
834            let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
835                rustc_hash::FxHashMap::default();
836            cache.reserve(4096);
837            for (row, col, txt) in entries {
838                let key = if txt.starts_with('=') {
839                    txt
840                } else {
841                    format!("={txt}")
842                };
843                let ast = if let Some(p) = cache.get(&key) {
844                    p.clone()
845                } else {
846                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
847                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
848                            .with_message(e.to_string())
849                    })?;
850                    cache.insert(key.clone(), parsed.clone());
851                    parsed
852                };
853                builder.add_formulas(sid, std::iter::once((row, col, ast)));
854            }
855        }
856        let _ = builder.finish();
857        Ok(())
858    }
859
860    /// Build graph for specific sheets (consuming only those staged entries).
861    pub fn build_graph_for_sheets<'a, I: IntoIterator<Item = &'a str>>(
862        &mut self,
863        sheets: I,
864    ) -> Result<(), formualizer_parse::ExcelError> {
865        let mut any = false;
866        // Collect entries first to avoid aliasing self while builder borrows graph
867        struct SheetFormulas<'s> {
868            sheet: &'s str,
869            entries: Vec<(u32, u32, String)>,
870        }
871        let mut collected: Vec<SheetFormulas<'_>> = Vec::new();
872        for s in sheets {
873            if let Some(entries) = self.staged_formulas.remove(s) {
874                collected.push(SheetFormulas { sheet: s, entries });
875            }
876        }
877        for SheetFormulas { sheet, .. } in collected.iter() {
878            let _ = self.add_sheet(sheet);
879        }
880        let mut builder = self.begin_bulk_ingest();
881        // small parse cache per call
882        let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
883            rustc_hash::FxHashMap::default();
884        cache.reserve(4096);
885        for SheetFormulas { sheet: s, entries } in collected.into_iter() {
886            any = true;
887            let sid = builder.add_sheet(s);
888            for (row, col, txt) in entries {
889                let key = if txt.starts_with('=') {
890                    txt
891                } else {
892                    format!("={txt}")
893                };
894                let ast = if let Some(p) = cache.get(&key) {
895                    p.clone()
896                } else {
897                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
898                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
899                            .with_message(e.to_string())
900                    })?;
901                    cache.insert(key.clone(), parsed.clone());
902                    parsed
903                };
904                builder.add_formulas(sid, std::iter::once((row, col, ast)));
905            }
906        }
907        if any {
908            let _ = builder.finish();
909        }
910        Ok(())
911    }
912
913    /// Begin bulk Arrow ingest for base values (Phase A)
914    pub fn begin_bulk_ingest_arrow(
915        &mut self,
916    ) -> crate::engine::arrow_ingest::ArrowBulkIngestBuilder<'_, R> {
917        crate::engine::arrow_ingest::ArrowBulkIngestBuilder::new(self)
918    }
919
920    /// Begin bulk updates to Arrow store (Phase C)
921    pub fn begin_bulk_update_arrow(
922        &mut self,
923    ) -> crate::engine::arrow_ingest::ArrowBulkUpdateBuilder<'_, R> {
924        crate::engine::arrow_ingest::ArrowBulkUpdateBuilder::new(self)
925    }
926
927    /// Insert rows (1-based) and mirror into Arrow store when enabled
928    pub fn insert_rows(
929        &mut self,
930        sheet: &str,
931        before: u32,
932        count: u32,
933    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
934    {
935        use crate::engine::graph::editor::vertex_editor::VertexEditor;
936        let sheet_id = self.graph.sheet_id(sheet).ok_or(
937            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
938                name: sheet.to_string(),
939                reason: "Unknown sheet".to_string(),
940            },
941        )?;
942        let mut editor = VertexEditor::new(&mut self.graph);
943        let summary = editor.insert_rows(sheet_id, before, count)?;
944        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
945            let before0 = before.saturating_sub(1) as usize;
946            asheet.insert_rows(before0, count as usize);
947        }
948        self.snapshot_id
949            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
950        self.has_edited = true;
951        Ok(summary)
952    }
953
954    /// Delete rows (1-based) and mirror into Arrow store when enabled
955    pub fn delete_rows(
956        &mut self,
957        sheet: &str,
958        start: u32,
959        count: u32,
960    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
961    {
962        use crate::engine::graph::editor::vertex_editor::VertexEditor;
963        let sheet_id = self.graph.sheet_id(sheet).ok_or(
964            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
965                name: sheet.to_string(),
966                reason: "Unknown sheet".to_string(),
967            },
968        )?;
969        let mut editor = VertexEditor::new(&mut self.graph);
970        let summary = editor.delete_rows(sheet_id, start, count)?;
971        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
972            let start0 = start.saturating_sub(1) as usize;
973            asheet.delete_rows(start0, count as usize);
974        }
975        self.snapshot_id
976            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
977        self.has_edited = true;
978        Ok(summary)
979    }
980
981    /// Insert columns (1-based) and mirror into Arrow store when enabled
982    pub fn insert_columns(
983        &mut self,
984        sheet: &str,
985        before: u32,
986        count: u32,
987    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
988    {
989        use crate::engine::graph::editor::vertex_editor::VertexEditor;
990        let sheet_id = self.graph.sheet_id(sheet).ok_or(
991            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
992                name: sheet.to_string(),
993                reason: "Unknown sheet".to_string(),
994            },
995        )?;
996        let mut editor = VertexEditor::new(&mut self.graph);
997        let summary = editor.insert_columns(sheet_id, before, count)?;
998        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
999            let before0 = before.saturating_sub(1) as usize;
1000            asheet.insert_columns(before0, count as usize);
1001        }
1002        self.snapshot_id
1003            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1004        self.has_edited = true;
1005        Ok(summary)
1006    }
1007
1008    /// Delete columns (1-based) and mirror into Arrow store when enabled
1009    pub fn delete_columns(
1010        &mut self,
1011        sheet: &str,
1012        start: u32,
1013        count: u32,
1014    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
1015    {
1016        use crate::engine::graph::editor::vertex_editor::VertexEditor;
1017        let sheet_id = self.graph.sheet_id(sheet).ok_or(
1018            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
1019                name: sheet.to_string(),
1020                reason: "Unknown sheet".to_string(),
1021            },
1022        )?;
1023        let mut editor = VertexEditor::new(&mut self.graph);
1024        let summary = editor.delete_columns(sheet_id, start, count)?;
1025        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
1026            let start0 = start.saturating_sub(1) as usize;
1027            asheet.delete_columns(start0, count as usize);
1028        }
1029        self.snapshot_id
1030            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1031        self.has_edited = true;
1032        Ok(summary)
1033    }
1034    /// Arrow-backed used row bounds across a column span (1-based inclusive cols).
1035    fn arrow_used_row_bounds(
1036        &self,
1037        sheet: &str,
1038        start_col: u32,
1039        end_col: u32,
1040    ) -> Option<(u32, u32)> {
1041        let a = self.sheet_store().sheet(sheet)?;
1042        if a.columns.is_empty() {
1043            return None;
1044        }
1045        let sc0 = start_col.saturating_sub(1) as usize;
1046        let ec0 = end_col.saturating_sub(1) as usize;
1047        let col_hi = a.columns.len().saturating_sub(1);
1048        if sc0 > col_hi {
1049            return None;
1050        }
1051        let ec0 = ec0.min(col_hi);
1052        // Pass-scoped cache with snapshot guard
1053        let snap = self.data_snapshot_id();
1054        let mut min_r0: Option<usize> = None;
1055        for ci in sc0..=ec0 {
1056            let sheet_id = self.graph.sheet_id(sheet)?;
1057            if let Some((Some(mv), _)) = self.row_bounds_cache.read().ok().and_then(|g| {
1058                g.as_ref()
1059                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
1060            }) {
1061                let mv = mv as usize;
1062                min_r0 = Some(min_r0.map(|m| m.min(mv)).unwrap_or(mv));
1063                continue;
1064            }
1065            // Compute and store
1066            let (min_c, max_c) = Self::scan_column_used_bounds(a, ci);
1067            if let Ok(mut g) = self.row_bounds_cache.write() {
1068                g.get_or_insert_with(|| RowBoundsCache::new(snap))
1069                    .put_row_bounds(sheet_id, ci, snap, (min_c, max_c));
1070            }
1071            if let Some(m) = min_c {
1072                min_r0 = Some(min_r0.map(|mm| mm.min(m as usize)).unwrap_or(m as usize));
1073            }
1074        }
1075        min_r0?;
1076        let mut max_r0: Option<usize> = None;
1077        for ci in sc0..=ec0 {
1078            let sheet_id = self.graph.sheet_id(sheet)?;
1079            if let Some((_, Some(mv))) = self.row_bounds_cache.read().ok().and_then(|g| {
1080                g.as_ref()
1081                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
1082            }) {
1083                let mv = mv as usize;
1084                max_r0 = Some(max_r0.map(|m| m.max(mv)).unwrap_or(mv));
1085                continue;
1086            }
1087            let (_min_c, max_c) = Self::scan_column_used_bounds(a, ci);
1088            if let Ok(mut g) = self.row_bounds_cache.write() {
1089                g.get_or_insert_with(|| RowBoundsCache::new(snap))
1090                    .put_row_bounds(sheet_id, ci, snap, (_min_c, max_c));
1091            }
1092            if let Some(m) = max_c {
1093                max_r0 = Some(max_r0.map(|mm| mm.max(m as usize)).unwrap_or(m as usize));
1094            }
1095        }
1096        match (min_r0, max_r0) {
1097            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
1098            _ => None,
1099        }
1100    }
1101
1102    fn scan_column_used_bounds(
1103        a: &crate::arrow_store::ArrowSheet,
1104        ci: usize,
1105    ) -> (Option<u32>, Option<u32>) {
1106        let col = &a.columns[ci];
1107
1108        // Min: scan dense chunks first, then sparse chunks in ascending index order.
1109        let mut min_r0: Option<u32> = None;
1110        for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
1111            let tags = chunk.type_tag.values();
1112            for (off, &t) in tags.iter().enumerate() {
1113                let overlay_non_empty = chunk
1114                    .overlay
1115                    .get(off)
1116                    .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
1117                    .unwrap_or(false);
1118                if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
1119                    let Some(&chunk_start) = a.chunk_starts.get(chunk_idx) else {
1120                        break;
1121                    };
1122                    let row0 = chunk_start + off;
1123                    min_r0 = Some(row0 as u32);
1124                    break;
1125                }
1126            }
1127            if min_r0.is_some() {
1128                break;
1129            }
1130        }
1131        if min_r0.is_none() && !col.sparse_chunks.is_empty() {
1132            let mut sparse_idxs: Vec<usize> = col.sparse_chunks.keys().copied().collect();
1133            sparse_idxs.sort_unstable();
1134            for chunk_idx in sparse_idxs {
1135                let Some(chunk) = col.sparse_chunks.get(&chunk_idx) else {
1136                    continue;
1137                };
1138                let Some(&chunk_start) = a.chunk_starts.get(chunk_idx) else {
1139                    continue;
1140                };
1141                let tags = chunk.type_tag.values();
1142                for (off, &t) in tags.iter().enumerate() {
1143                    let overlay_non_empty = chunk
1144                        .overlay
1145                        .get(off)
1146                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
1147                        .unwrap_or(false);
1148                    if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
1149                        let row0 = chunk_start + off;
1150                        min_r0 = Some(row0 as u32);
1151                        break;
1152                    }
1153                }
1154                if min_r0.is_some() {
1155                    break;
1156                }
1157            }
1158        }
1159
1160        // Max: scan sparse chunks in descending index order, then dense chunks in reverse.
1161        let mut max_r0: Option<u32> = None;
1162        if !col.sparse_chunks.is_empty() {
1163            let mut sparse_idxs: Vec<usize> = col.sparse_chunks.keys().copied().collect();
1164            sparse_idxs.sort_unstable_by(|a, b| b.cmp(a));
1165            for chunk_idx in sparse_idxs {
1166                let Some(chunk) = col.sparse_chunks.get(&chunk_idx) else {
1167                    continue;
1168                };
1169                let Some(&chunk_start) = a.chunk_starts.get(chunk_idx) else {
1170                    continue;
1171                };
1172                let tags = chunk.type_tag.values();
1173                for (rev_idx, &t) in tags.iter().enumerate().rev() {
1174                    let overlay_non_empty = chunk
1175                        .overlay
1176                        .get(rev_idx)
1177                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
1178                        .unwrap_or(false);
1179                    if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
1180                        let row0 = chunk_start + rev_idx;
1181                        max_r0 = Some(row0 as u32);
1182                        break;
1183                    }
1184                }
1185                if max_r0.is_some() {
1186                    break;
1187                }
1188            }
1189        }
1190        if max_r0.is_none() {
1191            for (chunk_idx, chunk) in col.chunks.iter().enumerate().rev() {
1192                let tags = chunk.type_tag.values();
1193                for (rev_idx, &t) in tags.iter().enumerate().rev() {
1194                    let overlay_non_empty = chunk
1195                        .overlay
1196                        .get(rev_idx)
1197                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
1198                        .unwrap_or(false);
1199                    if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
1200                        let Some(&chunk_start) = a.chunk_starts.get(chunk_idx) else {
1201                            break;
1202                        };
1203                        let row0 = chunk_start + rev_idx;
1204                        max_r0 = Some(row0 as u32);
1205                        break;
1206                    }
1207                }
1208                if max_r0.is_some() {
1209                    break;
1210                }
1211            }
1212        }
1213
1214        (min_r0, max_r0)
1215    }
1216
1217    /// Arrow-backed used column bounds across a row span (1-based inclusive rows).
1218    fn arrow_used_col_bounds(
1219        &self,
1220        sheet: &str,
1221        start_row: u32,
1222        end_row: u32,
1223    ) -> Option<(u32, u32)> {
1224        let a = self.sheet_store().sheet(sheet)?;
1225        if a.columns.is_empty() {
1226            return None;
1227        }
1228        let sr0 = start_row.saturating_sub(1) as usize;
1229        let er0 = end_row.saturating_sub(1) as usize;
1230        if sr0 > er0 {
1231            return None;
1232        }
1233        // Map start/end rows into chunk ranges
1234        // We will scan each column for any non-empty within [sr0..=er0]
1235        let mut min_c0: Option<usize> = None;
1236        let mut max_c0: Option<usize> = None;
1237        // Precompute chunk bounds for row range
1238        for (ci, col) in a.columns.iter().enumerate() {
1239            let mut any_in_range = false;
1240
1241            let scan_chunk = |chunk_idx: usize, chunk: &crate::arrow_store::ColumnChunk| -> bool {
1242                let Some(&chunk_start) = a.chunk_starts.get(chunk_idx) else {
1243                    return false;
1244                };
1245                let chunk_len = chunk.type_tag.len();
1246                if chunk_len == 0 {
1247                    return false;
1248                }
1249                let chunk_end = chunk_start + chunk_len.saturating_sub(1);
1250                // check intersection
1251                if sr0 > chunk_end || er0 < chunk_start {
1252                    return false;
1253                }
1254                let start_off = sr0.max(chunk_start) - chunk_start;
1255                let end_off = er0.min(chunk_end) - chunk_start;
1256                let tags = chunk.type_tag.values();
1257                for off in start_off..=end_off {
1258                    let overlay_non_empty = chunk
1259                        .overlay
1260                        .get(off)
1261                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
1262                        .unwrap_or(false);
1263                    if overlay_non_empty || tags[off] != crate::arrow_store::TypeTag::Empty as u8 {
1264                        return true;
1265                    }
1266                }
1267                false
1268            };
1269
1270            for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
1271                if scan_chunk(chunk_idx, chunk) {
1272                    any_in_range = true;
1273                    break;
1274                }
1275            }
1276
1277            if !any_in_range && !col.sparse_chunks.is_empty() {
1278                for (&chunk_idx, chunk) in col.sparse_chunks.iter() {
1279                    if scan_chunk(chunk_idx, chunk) {
1280                        any_in_range = true;
1281                        break;
1282                    }
1283                }
1284            }
1285
1286            if any_in_range {
1287                min_c0 = Some(min_c0.map(|m| m.min(ci)).unwrap_or(ci));
1288                max_c0 = Some(max_c0.map(|m| m.max(ci)).unwrap_or(ci));
1289            }
1290        }
1291        match (min_c0, max_c0) {
1292            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
1293            _ => None,
1294        }
1295    }
1296
1297    /// Mirror a single cell value into the Arrow overlay if enabled.
1298    /// Handles capacity growth, per-chunk overlay set, and heuristic compaction.
1299    fn mirror_value_to_overlay(&mut self, sheet: &str, row: u32, col: u32, value: &LiteralValue) {
1300        if !(self.config.arrow_storage_enabled && self.config.delta_overlay_enabled) {
1301            return;
1302        }
1303        if self.arrow_sheets.sheet(sheet).is_none() {
1304            self.arrow_sheets
1305                .sheets
1306                .push(crate::arrow_store::ArrowSheet {
1307                    name: std::sync::Arc::<str>::from(sheet),
1308                    columns: Vec::new(),
1309                    nrows: 0,
1310                    chunk_starts: Vec::new(),
1311                    chunk_rows: 32 * 1024,
1312                });
1313        }
1314
1315        let row0 = row.saturating_sub(1) as usize;
1316        let col0 = col.saturating_sub(1) as usize;
1317
1318        let asheet = self
1319            .arrow_sheets
1320            .sheet_mut(sheet)
1321            .expect("ArrowSheet must exist");
1322
1323        let cur_cols = asheet.columns.len();
1324        if col0 >= cur_cols {
1325            asheet.insert_columns(cur_cols, (col0 + 1) - cur_cols);
1326        }
1327
1328        if row0 >= asheet.nrows as usize {
1329            if asheet.columns.is_empty() {
1330                asheet.insert_columns(0, 1);
1331            }
1332            asheet.ensure_row_capacity(row0 + 1);
1333        }
1334        if let Some((ch_idx, in_off)) = asheet.chunk_of_row(row0) {
1335            use crate::arrow_store::OverlayValue;
1336            let ov = match value {
1337                LiteralValue::Empty => OverlayValue::Empty,
1338                LiteralValue::Int(i) => OverlayValue::Number(*i as f64),
1339                LiteralValue::Number(n) => OverlayValue::Number(*n),
1340                LiteralValue::Boolean(b) => OverlayValue::Boolean(*b),
1341                LiteralValue::Text(s) => OverlayValue::Text(std::sync::Arc::from(s.clone())),
1342                LiteralValue::Error(e) => {
1343                    OverlayValue::Error(crate::arrow_store::map_error_code(e.kind))
1344                }
1345                LiteralValue::Date(d) => {
1346                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
1347                    let serial = crate::builtins::datetime::datetime_to_serial_for(
1348                        self.config.date_system,
1349                        &dt,
1350                    );
1351                    OverlayValue::Number(serial)
1352                }
1353                LiteralValue::DateTime(dt) => {
1354                    let serial = crate::builtins::datetime::datetime_to_serial_for(
1355                        self.config.date_system,
1356                        dt,
1357                    );
1358                    OverlayValue::Number(serial)
1359                }
1360                LiteralValue::Time(t) => {
1361                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
1362                    OverlayValue::Number(serial)
1363                }
1364                LiteralValue::Duration(d) => {
1365                    let serial = d.num_seconds() as f64 / 86_400.0;
1366                    OverlayValue::Number(serial)
1367                }
1368                LiteralValue::Pending => OverlayValue::Pending,
1369                LiteralValue::Array(_) => OverlayValue::Error(crate::arrow_store::map_error_code(
1370                    formualizer_common::ExcelErrorKind::Value,
1371                )),
1372            };
1373            if let Some(ch) = asheet.ensure_column_chunk_mut(col0, ch_idx) {
1374                ch.overlay.set(in_off, ov);
1375            } else {
1376                return;
1377            }
1378            // Heuristic compaction: > len/50 or > 1024
1379            let abs_threshold = 1024usize;
1380            let frac_den = 50usize;
1381            if asheet.maybe_compact_chunk(col0, ch_idx, abs_threshold, frac_den) {
1382                self.overlay_compactions = self.overlay_compactions.saturating_add(1);
1383            }
1384        }
1385    }
1386
1387    fn mirror_vertex_value_to_overlay(&mut self, vertex_id: VertexId, value: &LiteralValue) {
1388        if !(self.config.arrow_storage_enabled
1389            && self.config.delta_overlay_enabled
1390            && self.config.write_formula_overlay_enabled)
1391        {
1392            return;
1393        }
1394        if !matches!(
1395            self.graph.get_vertex_kind(vertex_id),
1396            VertexKind::FormulaScalar | VertexKind::FormulaArray
1397        ) {
1398            return;
1399        }
1400        let Some(cell) = self.graph.get_cell_ref(vertex_id) else {
1401            return;
1402        };
1403        let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
1404        self.mirror_value_to_overlay(
1405            &sheet_name,
1406            cell.coord.row() + 1,
1407            cell.coord.col() + 1,
1408            value,
1409        );
1410    }
1411
1412    fn resolve_sheet_locator_for_write(
1413        &mut self,
1414        loc: formualizer_common::SheetLocator<'_>,
1415        current_sheet: &str,
1416    ) -> Result<SheetId, ExcelError> {
1417        Ok(match loc {
1418            formualizer_common::SheetLocator::Id(id) => id,
1419            formualizer_common::SheetLocator::Name(name) => self.graph.sheet_id_mut(name.as_ref()),
1420            formualizer_common::SheetLocator::Current => self.graph.sheet_id_mut(current_sheet),
1421        })
1422    }
1423
1424    fn resolve_sheet_locator_for_read(
1425        &self,
1426        loc: formualizer_common::SheetLocator<'_>,
1427        current_sheet: &str,
1428    ) -> Result<SheetId, ExcelError> {
1429        match loc {
1430            formualizer_common::SheetLocator::Id(id) => Ok(id),
1431            formualizer_common::SheetLocator::Name(name) => self
1432                .graph
1433                .sheet_id(name.as_ref())
1434                .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref)),
1435            formualizer_common::SheetLocator::Current => self
1436                .graph
1437                .sheet_id(current_sheet)
1438                .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref)),
1439        }
1440    }
1441
1442    /// Set a cell value
1443    pub fn set_cell_value(
1444        &mut self,
1445        sheet: &str,
1446        row: u32,
1447        col: u32,
1448        value: LiteralValue,
1449    ) -> Result<(), ExcelError> {
1450        self.graph.set_cell_value(sheet, row, col, value.clone())?;
1451        // Mirror into Arrow overlay when enabled
1452        self.mirror_value_to_overlay(sheet, row, col, &value);
1453        // Advance snapshot to reflect external mutation
1454        self.snapshot_id
1455            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1456        self.has_edited = true;
1457        Ok(())
1458    }
1459
1460    pub fn set_cell_value_ref(
1461        &mut self,
1462        cell: formualizer_common::SheetCellRef<'_>,
1463        current_sheet: &str,
1464        value: LiteralValue,
1465    ) -> Result<(), ExcelError> {
1466        let owned = cell.into_owned();
1467        let sheet_id = self.resolve_sheet_locator_for_write(owned.sheet, current_sheet)?;
1468        let sheet_name = self.graph.sheet_name(sheet_id).to_string();
1469        self.set_cell_value(
1470            &sheet_name,
1471            owned.coord.row() + 1,
1472            owned.coord.col() + 1,
1473            value,
1474        )
1475    }
1476
1477    pub fn set_cell_formula_ref(
1478        &mut self,
1479        cell: formualizer_common::SheetCellRef<'_>,
1480        current_sheet: &str,
1481        ast: ASTNode,
1482    ) -> Result<(), ExcelError> {
1483        let owned = cell.into_owned();
1484        let sheet_id = self.resolve_sheet_locator_for_write(owned.sheet, current_sheet)?;
1485        let sheet_name = self.graph.sheet_name(sheet_id).to_string();
1486        self.set_cell_formula(
1487            &sheet_name,
1488            owned.coord.row() + 1,
1489            owned.coord.col() + 1,
1490            ast,
1491        )
1492    }
1493
1494    pub fn get_cell_value_ref(
1495        &self,
1496        cell: formualizer_common::SheetCellRef<'_>,
1497        current_sheet: &str,
1498    ) -> Result<Option<LiteralValue>, ExcelError> {
1499        let owned = cell.into_owned();
1500        let sheet_id = self.resolve_sheet_locator_for_read(owned.sheet, current_sheet)?;
1501        let sheet_name = self.graph.sheet_name(sheet_id);
1502        Ok(self.get_cell_value(sheet_name, owned.coord.row() + 1, owned.coord.col() + 1))
1503    }
1504
1505    pub fn resolve_range_view_sheet_ref<'c>(
1506        &'c self,
1507        r: &formualizer_common::SheetRef<'_>,
1508        current_sheet: &str,
1509    ) -> Result<RangeView<'c>, ExcelError> {
1510        use formualizer_common::SheetLocator;
1511
1512        let sheet_to_opt_name = |loc: SheetLocator<'_>| -> Result<Option<String>, ExcelError> {
1513            match loc {
1514                SheetLocator::Current => Ok(None),
1515                SheetLocator::Name(name) => Ok(Some(name.as_ref().to_string())),
1516                SheetLocator::Id(id) => Ok(Some(self.graph.sheet_name(id).to_string())),
1517            }
1518        };
1519
1520        let rt = match r {
1521            formualizer_common::SheetRef::Cell(cell) => ReferenceType::Cell {
1522                sheet: sheet_to_opt_name(cell.sheet.clone())?,
1523                row: cell.coord.row() + 1,
1524                col: cell.coord.col() + 1,
1525                row_abs: cell.coord.row_abs(),
1526                col_abs: cell.coord.col_abs(),
1527            },
1528            formualizer_common::SheetRef::Range(range) => ReferenceType::Range {
1529                sheet: sheet_to_opt_name(range.sheet.clone())?,
1530                start_row: range.start_row.map(|b| b.index + 1),
1531                start_col: range.start_col.map(|b| b.index + 1),
1532                end_row: range.end_row.map(|b| b.index + 1),
1533                end_col: range.end_col.map(|b| b.index + 1),
1534                start_row_abs: range.start_row.map(|b| b.abs).unwrap_or(false),
1535                start_col_abs: range.start_col.map(|b| b.abs).unwrap_or(false),
1536                end_row_abs: range.end_row.map(|b| b.abs).unwrap_or(false),
1537                end_col_abs: range.end_col.map(|b| b.abs).unwrap_or(false),
1538            },
1539        };
1540
1541        crate::traits::EvaluationContext::resolve_range_view(self, &rt, current_sheet)
1542    }
1543
1544    /// Set a cell formula
1545    pub fn set_cell_formula(
1546        &mut self,
1547        sheet: &str,
1548        row: u32,
1549        col: u32,
1550        ast: ASTNode,
1551    ) -> Result<(), ExcelError> {
1552        let volatile = self.is_ast_volatile_with_provider(&ast);
1553        self.graph
1554            .set_cell_formula_with_volatility(sheet, row, col, ast, volatile)?;
1555        // Advance snapshot to reflect external mutation
1556        self.snapshot_id
1557            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1558        self.has_edited = true;
1559        Ok(())
1560    }
1561
1562    /// Bulk set many formulas on a sheet. Skips per-cell snapshot bumping and minimizes edge rebuilds.
1563    pub fn bulk_set_formulas<I>(&mut self, sheet: &str, items: I) -> Result<usize, ExcelError>
1564    where
1565        I: IntoIterator<Item = (u32, u32, ASTNode)>,
1566    {
1567        let collected: Vec<(u32, u32, ASTNode)> = items.into_iter().collect();
1568        let vol_flags: Vec<bool> = collected
1569            .iter()
1570            .map(|(_, _, ast)| self.is_ast_volatile_with_provider(ast))
1571            .collect();
1572        let n = self
1573            .graph
1574            .bulk_set_formulas_with_volatility(sheet, collected, vol_flags)?;
1575        // Single snapshot bump after batch
1576        self.snapshot_id
1577            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1578        if n > 0 {
1579            self.has_edited = true;
1580        }
1581        Ok(n)
1582    }
1583
1584    /// Get a cell value
1585    pub fn get_cell_value(&self, sheet: &str, row: u32, col: u32) -> Option<LiteralValue> {
1586        // If a vertex exists in the graph (formula or value cell), use graph value to preserve types.
1587        // Only fall back to Arrow for cells not in the graph.
1588        if let Some(sheet_id) = self.graph.sheet_id(sheet) {
1589            let coord = Coord::from_excel(row, col, true, true);
1590            let addr = CellRef::new(sheet_id, coord);
1591            if let Some(vid) = self.graph.get_vertex_id_for_address(&addr) {
1592                match self.graph.get_vertex_kind(*vid) {
1593                    VertexKind::FormulaScalar | VertexKind::FormulaArray | VertexKind::Cell => {
1594                        return self.graph.get_cell_value(sheet, row, col);
1595                    }
1596                    _ => {}
1597                }
1598            }
1599        }
1600        if let Some(asheet) = self.sheet_store().sheet(sheet) {
1601            let r0 = row.saturating_sub(1) as usize;
1602            let c0 = col.saturating_sub(1) as usize;
1603            let av = asheet.range_view(r0, c0, r0, c0);
1604            let v = av.get_cell(0, 0);
1605            return Some(v);
1606        }
1607        self.graph.get_cell_value(sheet, row, col)
1608    }
1609
1610    /// Get formula AST (if any) and current stored value for a cell
1611    pub fn get_cell(
1612        &self,
1613        sheet: &str,
1614        row: u32,
1615        col: u32,
1616    ) -> Option<(Option<formualizer_parse::ASTNode>, Option<LiteralValue>)> {
1617        let v = self.get_cell_value(sheet, row, col);
1618        let sheet_id = self.graph.sheet_id(sheet)?;
1619        let coord = Coord::from_excel(row, col, true, true);
1620        let cell = CellRef::new(sheet_id, coord);
1621        let vid = self.graph.get_vertex_for_cell(&cell)?;
1622        let ast = self.graph.get_formula_id(vid).and_then(|ast_id| {
1623            self.graph
1624                .data_store()
1625                .retrieve_ast(ast_id, self.graph.sheet_reg())
1626        });
1627        Some((ast, v))
1628    }
1629
1630    /// Begin batch operations - defer CSR rebuilds for better performance
1631    pub fn begin_batch(&mut self) {
1632        self.graph.begin_batch();
1633    }
1634
1635    /// End batch operations and trigger CSR rebuild
1636    pub fn end_batch(&mut self) {
1637        self.graph.end_batch();
1638    }
1639
1640    /// Evaluate a single vertex.
1641    /// This is the core of the sequential evaluation logic for Milestone 3.1.
1642    #[inline]
1643    fn record_cell_if_changed(
1644        delta: &mut DeltaCollector,
1645        cell: &CellRef,
1646        old: &LiteralValue,
1647        new: &LiteralValue,
1648    ) {
1649        if old != new {
1650            delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
1651        }
1652    }
1653
1654    pub fn evaluate_vertex(&mut self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
1655        self.evaluate_vertex_impl(vertex_id, None)
1656    }
1657
1658    fn evaluate_vertex_impl(
1659        &mut self,
1660        vertex_id: VertexId,
1661        delta: Option<&mut DeltaCollector>,
1662    ) -> Result<LiteralValue, ExcelError> {
1663        let mut delta = delta;
1664        // Check if vertex exists
1665        if !self.graph.vertex_exists(vertex_id) {
1666            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
1667                .with_message(format!("Vertex not found: {vertex_id:?}")));
1668        }
1669
1670        // Get vertex kind and check if it needs evaluation
1671        let kind = self.graph.get_vertex_kind(vertex_id);
1672        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
1673
1674        let ast_id = match kind {
1675            VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1676                if let Some(ast_id) = self.graph.get_formula_id(vertex_id) {
1677                    ast_id
1678                } else {
1679                    return Ok(LiteralValue::Int(0));
1680                }
1681            }
1682            VertexKind::Empty | VertexKind::Cell => {
1683                // Check if there's a value stored
1684                if let Some(value) = self.graph.get_value(vertex_id) {
1685                    return Ok(value.clone());
1686                } else {
1687                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
1688                }
1689            }
1690            VertexKind::NamedScalar => {
1691                let value = self.evaluate_named_scalar(vertex_id, sheet_id)?;
1692                return Ok(value);
1693            }
1694            VertexKind::NamedArray => {
1695                return Err(ExcelError::new(ExcelErrorKind::NImpl)
1696                    .with_message("Array named ranges not yet implemented"));
1697            }
1698            VertexKind::InfiniteRange
1699            | VertexKind::Range
1700            | VertexKind::External
1701            | VertexKind::Table => {
1702                // Not directly evaluatable here; return stored or 0
1703                if let Some(value) = self.graph.get_value(vertex_id) {
1704                    return Ok(value.clone());
1705                } else {
1706                    return Ok(LiteralValue::Int(0));
1707                }
1708            }
1709        };
1710
1711        // The interpreter uses a reference to the engine as the context.
1712        let sheet_name = self.graph.sheet_name(sheet_id);
1713        let cell_ref = self
1714            .graph
1715            .get_cell_ref(vertex_id)
1716            .expect("cell ref for vertex");
1717        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
1718
1719        let result =
1720            interpreter.evaluate_arena_ast(ast_id, self.graph.data_store(), self.graph.sheet_reg());
1721
1722        // If array result, perform spill from the anchor cell
1723        match result {
1724            Ok(cv) => {
1725                let result_literal = cv.into_literal();
1726                match result_literal {
1727                    LiteralValue::Array(rows) => {
1728                        // Update kind to FormulaArray for tracking
1729                        self.graph
1730                            .set_kind(vertex_id, crate::engine::vertex::VertexKind::FormulaArray);
1731                        // Build target cells rectangle starting from anchor
1732                        let anchor = self
1733                            .graph
1734                            .get_cell_ref(vertex_id)
1735                            .expect("cell ref for vertex");
1736                        let sheet_id = anchor.sheet_id;
1737                        let h = rows.len() as u32;
1738                        let w = rows.first().map(|r| r.len()).unwrap_or(0) as u32;
1739                        // Bounds check to avoid out-of-range writes (align to AbsCoord capacity)
1740                        const PACKED_MAX_ROW: u32 = 1_048_575; // 20-bit max
1741                        const PACKED_MAX_COL: u32 = 16_383; // 14-bit max
1742                        let end_row = anchor.coord.row().saturating_add(h).saturating_sub(1);
1743                        let end_col = anchor.coord.col().saturating_add(w).saturating_sub(1);
1744                        if end_row > PACKED_MAX_ROW || end_col > PACKED_MAX_COL {
1745                            let spill_err = ExcelError::new(ExcelErrorKind::Spill)
1746                                .with_message("Spill exceeds sheet bounds")
1747                                .with_extra(formualizer_common::ExcelErrorExtra::Spill {
1748                                    expected_rows: h,
1749                                    expected_cols: w,
1750                                });
1751                            let spill_val = LiteralValue::Error(spill_err.clone());
1752                            self.graph.update_vertex_value(vertex_id, spill_val.clone());
1753                            return Ok(spill_val);
1754                        }
1755                        let mut targets = Vec::new();
1756                        for r in 0..h {
1757                            for c in 0..w {
1758                                targets.push(self.graph.make_cell_ref_internal(
1759                                    sheet_id,
1760                                    anchor.coord.row() + r,
1761                                    anchor.coord.col() + c,
1762                                ));
1763                            }
1764                        }
1765
1766                        // Plan spill via spill manager shim
1767                        match self.spill_mgr.reserve(
1768                            vertex_id,
1769                            anchor,
1770                            SpillShape { rows: h, cols: w },
1771                            SpillMeta {
1772                                epoch: self.recalc_epoch,
1773                                config: self.config.spill,
1774                            },
1775                        ) {
1776                            Ok(()) => {
1777                                // Commit: write values to grid
1778                                // Default conflict policy is Error + FirstWins; reserve() enforces in-flight locks
1779                                // and plan_spill_region enforces overlap with committed formulas/spills/values.
1780                                if let Err(e) = self.commit_spill_and_mirror(
1781                                    vertex_id,
1782                                    &targets,
1783                                    rows.clone(),
1784                                    delta.as_deref_mut(),
1785                                ) {
1786                                    // If commit fails, mark as error
1787                                    if let Some(d) = delta.as_deref_mut() {
1788                                        let old = self
1789                                            .graph
1790                                            .get_value(vertex_id)
1791                                            .unwrap_or(LiteralValue::Empty);
1792                                        let new = LiteralValue::Error(e.clone());
1793                                        if old != new {
1794                                            d.record_cell(
1795                                                anchor.sheet_id,
1796                                                anchor.coord.row(),
1797                                                anchor.coord.col(),
1798                                            );
1799                                        }
1800                                    }
1801                                    self.graph.update_vertex_value(
1802                                        vertex_id,
1803                                        LiteralValue::Error(e.clone()),
1804                                    );
1805                                    return Ok(LiteralValue::Error(e));
1806                                }
1807                                // Anchor shows the top-left value, like Excel
1808                                let top_left = rows
1809                                    .first()
1810                                    .and_then(|r| r.first())
1811                                    .cloned()
1812                                    .unwrap_or(LiteralValue::Empty);
1813                                self.graph.update_vertex_value(vertex_id, top_left.clone());
1814                                Ok(top_left)
1815                            }
1816                            Err(e) => {
1817                                let spill_err = ExcelError::new(ExcelErrorKind::Spill)
1818                                    .with_message(
1819                                        e.message.unwrap_or_else(|| "Spill blocked".to_string()),
1820                                    )
1821                                    .with_extra(formualizer_common::ExcelErrorExtra::Spill {
1822                                        expected_rows: h,
1823                                        expected_cols: w,
1824                                    });
1825                                let spill_val = LiteralValue::Error(spill_err.clone());
1826                                if let Some(d) = delta.as_deref_mut() {
1827                                    let old = self
1828                                        .graph
1829                                        .get_value(vertex_id)
1830                                        .unwrap_or(LiteralValue::Empty);
1831                                    if old != spill_val {
1832                                        d.record_cell(
1833                                            anchor.sheet_id,
1834                                            anchor.coord.row(),
1835                                            anchor.coord.col(),
1836                                        );
1837                                    }
1838                                }
1839                                self.graph.update_vertex_value(vertex_id, spill_val.clone());
1840                                Ok(spill_val)
1841                            }
1842                        }
1843                    }
1844                    other => {
1845                        // Scalar result: store value and ensure any previous spill is cleared
1846                        let spill_cells = self
1847                            .graph
1848                            .spill_cells_for_anchor(vertex_id)
1849                            .map(|cells| cells.to_vec())
1850                            .unwrap_or_default();
1851                        if let Some(d) = delta.as_deref_mut()
1852                            && let Some(anchor) = self.graph.get_cell_ref_for_vertex(vertex_id)
1853                        {
1854                            if spill_cells.is_empty() {
1855                                let old = self
1856                                    .graph
1857                                    .get_value(vertex_id)
1858                                    .unwrap_or(LiteralValue::Empty);
1859                                if old != other {
1860                                    d.record_cell(
1861                                        anchor.sheet_id,
1862                                        anchor.coord.row(),
1863                                        anchor.coord.col(),
1864                                    );
1865                                }
1866                            } else {
1867                                for cell in spill_cells.iter() {
1868                                    let sheet_name = self.graph.sheet_name(cell.sheet_id);
1869                                    let old = self
1870                                        .get_cell_value(
1871                                            sheet_name,
1872                                            cell.coord.row() + 1,
1873                                            cell.coord.col() + 1,
1874                                        )
1875                                        .unwrap_or(LiteralValue::Empty);
1876                                    let new = if cell.sheet_id == anchor.sheet_id
1877                                        && cell.coord.row() == anchor.coord.row()
1878                                        && cell.coord.col() == anchor.coord.col()
1879                                    {
1880                                        other.clone()
1881                                    } else {
1882                                        LiteralValue::Empty
1883                                    };
1884                                    Self::record_cell_if_changed(d, cell, &old, &new);
1885                                }
1886                            }
1887                        }
1888                        self.graph.clear_spill_region(vertex_id);
1889                        if self.config.arrow_storage_enabled && self.config.delta_overlay_enabled {
1890                            let empty = LiteralValue::Empty;
1891                            for cell in spill_cells.iter() {
1892                                let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
1893                                self.mirror_value_to_overlay(
1894                                    &sheet_name,
1895                                    cell.coord.row() + 1,
1896                                    cell.coord.col() + 1,
1897                                    &empty,
1898                                );
1899                            }
1900                        }
1901                        self.graph.update_vertex_value(vertex_id, other.clone());
1902                        // Optionally mirror into Arrow overlay for Arrow-backed reads
1903                        if self.config.arrow_storage_enabled
1904                            && self.config.delta_overlay_enabled
1905                            && self.config.write_formula_overlay_enabled
1906                        {
1907                            let anchor = self
1908                                .graph
1909                                .get_cell_ref(vertex_id)
1910                                .expect("cell ref for vertex");
1911                            let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
1912                            self.mirror_value_to_overlay(
1913                                &sheet_name,
1914                                anchor.coord.row() + 1,
1915                                anchor.coord.col() + 1,
1916                                &other,
1917                            );
1918                        }
1919                        Ok(other)
1920                    }
1921                }
1922            }
1923            Err(e) => {
1924                // Runtime Excel error: store as a cell value instead of propagating
1925                // as an exception so bulk eval paths don't fail the whole pass.
1926                let spill_cells = self
1927                    .graph
1928                    .spill_cells_for_anchor(vertex_id)
1929                    .map(|cells| cells.to_vec())
1930                    .unwrap_or_default();
1931                let err_val = LiteralValue::Error(e.clone());
1932                if let Some(d) = delta
1933                    && let Some(anchor) = self.graph.get_cell_ref_for_vertex(vertex_id)
1934                {
1935                    if spill_cells.is_empty() {
1936                        let old = self
1937                            .graph
1938                            .get_value(vertex_id)
1939                            .unwrap_or(LiteralValue::Empty);
1940                        if old != err_val {
1941                            d.record_cell(anchor.sheet_id, anchor.coord.row(), anchor.coord.col());
1942                        }
1943                    } else {
1944                        for cell in spill_cells.iter() {
1945                            let sheet_name = self.graph.sheet_name(cell.sheet_id);
1946                            let old = self
1947                                .get_cell_value(
1948                                    sheet_name,
1949                                    cell.coord.row() + 1,
1950                                    cell.coord.col() + 1,
1951                                )
1952                                .unwrap_or(LiteralValue::Empty);
1953                            let new = if cell.sheet_id == anchor.sheet_id
1954                                && cell.coord.row() == anchor.coord.row()
1955                                && cell.coord.col() == anchor.coord.col()
1956                            {
1957                                err_val.clone()
1958                            } else {
1959                                LiteralValue::Empty
1960                            };
1961                            Self::record_cell_if_changed(d, cell, &old, &new);
1962                        }
1963                    }
1964                }
1965                self.graph.clear_spill_region(vertex_id);
1966                if self.config.arrow_storage_enabled && self.config.delta_overlay_enabled {
1967                    let empty = LiteralValue::Empty;
1968                    for cell in spill_cells.iter() {
1969                        let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
1970                        self.mirror_value_to_overlay(
1971                            &sheet_name,
1972                            cell.coord.row() + 1,
1973                            cell.coord.col() + 1,
1974                            &empty,
1975                        );
1976                    }
1977                }
1978                self.graph.update_vertex_value(vertex_id, err_val.clone());
1979                if self.config.arrow_storage_enabled
1980                    && self.config.delta_overlay_enabled
1981                    && self.config.write_formula_overlay_enabled
1982                {
1983                    let anchor = self
1984                        .graph
1985                        .get_cell_ref(vertex_id)
1986                        .expect("cell ref for vertex");
1987                    let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
1988                    self.mirror_value_to_overlay(
1989                        &sheet_name,
1990                        anchor.coord.row() + 1,
1991                        anchor.coord.col() + 1,
1992                        &err_val,
1993                    );
1994                }
1995                Ok(err_val)
1996            }
1997        }
1998    }
1999
2000    fn evaluate_named_scalar(
2001        &mut self,
2002        vertex_id: VertexId,
2003        sheet_id: SheetId,
2004    ) -> Result<LiteralValue, ExcelError> {
2005        let named_range = self.graph.named_range_by_vertex(vertex_id).ok_or_else(|| {
2006            ExcelError::new(ExcelErrorKind::Name)
2007                .with_message("Named range metadata missing".to_string())
2008        })?;
2009
2010        match &named_range.definition {
2011            NamedDefinition::Cell(cell_ref) => {
2012                let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
2013                let row = cell_ref.coord.row() + 1;
2014                let col = cell_ref.coord.col() + 1;
2015
2016                if let Some(dep_vertex) = self.graph.get_vertex_for_cell(cell_ref)
2017                    && matches!(
2018                        self.graph.get_vertex_kind(dep_vertex),
2019                        VertexKind::FormulaScalar | VertexKind::FormulaArray
2020                    )
2021                {
2022                    let value = if let Some(existing) = self.graph.get_value(dep_vertex) {
2023                        existing.clone()
2024                    } else {
2025                        let computed = self.evaluate_vertex(dep_vertex)?;
2026                        self.graph.update_vertex_value(dep_vertex, computed.clone());
2027                        computed
2028                    };
2029                    self.graph.update_vertex_value(vertex_id, value.clone());
2030                    Ok(value)
2031                } else {
2032                    let value = self
2033                        .get_cell_value(sheet_name, row, col)
2034                        .unwrap_or(LiteralValue::Empty);
2035                    self.graph.update_vertex_value(vertex_id, value.clone());
2036                    Ok(value)
2037                }
2038            }
2039            NamedDefinition::Formula { ast, .. } => {
2040                let context_sheet = match named_range.scope {
2041                    NameScope::Sheet(id) => id,
2042                    NameScope::Workbook => sheet_id,
2043                };
2044                let sheet_name = self.graph.sheet_name(context_sheet);
2045                let cell_ref = self
2046                    .graph
2047                    .get_cell_ref(vertex_id)
2048                    .unwrap_or_else(|| self.graph.make_cell_ref(sheet_name, 0, 0));
2049                let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
2050                match interpreter.evaluate_ast(ast) {
2051                    Ok(cv) => {
2052                        let value = cv.into_literal();
2053                        match value {
2054                            LiteralValue::Array(_) => {
2055                                let err = ExcelError::new(ExcelErrorKind::NImpl)
2056                                    .with_message("Array result in scalar named range".to_string());
2057                                let err_val = LiteralValue::Error(err.clone());
2058                                self.graph.update_vertex_value(vertex_id, err_val.clone());
2059                                Ok(err_val)
2060                            }
2061                            other => {
2062                                self.graph.update_vertex_value(vertex_id, other.clone());
2063                                Ok(other)
2064                            }
2065                        }
2066                    }
2067                    Err(err) => {
2068                        let err_val = LiteralValue::Error(err.clone());
2069                        self.graph.update_vertex_value(vertex_id, err_val.clone());
2070                        Ok(err_val)
2071                    }
2072                }
2073            }
2074            NamedDefinition::Range(_) => Err(ExcelError::new(ExcelErrorKind::NImpl)
2075                .with_message("Array named ranges not yet implemented".to_string())),
2076        }
2077    }
2078
2079    /// Evaluate only the necessary precedents for specific target cells (demand-driven)
2080    pub fn evaluate_until(
2081        &mut self,
2082        targets: &[(&str, u32, u32)],
2083    ) -> Result<EvalResult, ExcelError> {
2084        #[cfg(feature = "tracing")]
2085        let _span_eval = tracing::info_span!("evaluate_until", targets = targets.len()).entered();
2086        let start = web_time::Instant::now();
2087        let _source_cache = self.source_cache_session();
2088
2089        // Parse target cell addresses
2090        let mut target_addrs = Vec::new();
2091        for (sheet, row, col) in targets {
2092            // For now, assume simple A1-style references on default sheet
2093            // TODO: Parse complex references with sheets
2094            let sheet_id = self.graph.sheet_id_mut(sheet);
2095            let coord = Coord::from_excel(*row, *col, true, true);
2096            target_addrs.push(CellRef::new(sheet_id, coord));
2097        }
2098
2099        // Find vertex IDs for targets
2100        let mut target_vertex_ids = Vec::new();
2101        for addr in &target_addrs {
2102            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
2103                target_vertex_ids.push(*vertex_id);
2104            }
2105        }
2106
2107        if target_vertex_ids.is_empty() {
2108            return Ok(EvalResult {
2109                computed_vertices: 0,
2110                cycle_errors: 0,
2111                elapsed: start.elapsed(),
2112            });
2113        }
2114
2115        // Build demand subgraph with virtual edges for compressed ranges
2116        #[cfg(feature = "tracing")]
2117        let _span_sub = tracing::info_span!("demand_subgraph_build").entered();
2118        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
2119        #[cfg(feature = "tracing")]
2120        drop(_span_sub);
2121
2122        if precedents_to_eval.is_empty() {
2123            return Ok(EvalResult {
2124                computed_vertices: 0,
2125                cycle_errors: 0,
2126                elapsed: start.elapsed(),
2127            });
2128        }
2129
2130        // Create schedule for the minimal subgraph, honoring virtual edges
2131        let scheduler = Scheduler::new(&self.graph);
2132        #[cfg(feature = "tracing")]
2133        let _span_sched =
2134            tracing::info_span!("schedule_build", vertices = precedents_to_eval.len()).entered();
2135        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
2136        #[cfg(feature = "tracing")]
2137        drop(_span_sched);
2138
2139        // Handle cycles first
2140        let mut cycle_errors = 0;
2141        for cycle in &schedule.cycles {
2142            cycle_errors += 1;
2143            let circ_error = LiteralValue::Error(
2144                ExcelError::new(ExcelErrorKind::Circ)
2145                    .with_message("Circular dependency detected".to_string()),
2146            );
2147            for &vertex_id in cycle {
2148                self.graph
2149                    .update_vertex_value(vertex_id, circ_error.clone());
2150            }
2151        }
2152
2153        // Evaluate layers (parallel when enabled, mirroring evaluate_all)
2154        let mut computed_vertices = 0;
2155        for layer in &schedule.layers {
2156            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2157                computed_vertices += self.evaluate_layer_parallel(layer)?;
2158            } else {
2159                computed_vertices += self.evaluate_layer_sequential(layer)?;
2160            }
2161        }
2162
2163        // Clear warmup context at end of evaluation
2164
2165        // Clear dirty flags for evaluated vertices
2166        self.graph.clear_dirty_flags(&precedents_to_eval);
2167
2168        // Re-dirty volatile vertices
2169        self.graph.redirty_volatiles();
2170
2171        Ok(EvalResult {
2172            computed_vertices,
2173            cycle_errors,
2174            elapsed: start.elapsed(),
2175        })
2176    }
2177
2178    fn evaluate_until_with_delta_collector(
2179        &mut self,
2180        targets: &[(&str, u32, u32)],
2181        delta: &mut DeltaCollector,
2182    ) -> Result<EvalResult, ExcelError> {
2183        #[cfg(feature = "tracing")]
2184        let _span_eval =
2185            tracing::info_span!("evaluate_until_with_delta", targets = targets.len()).entered();
2186        let start = web_time::Instant::now();
2187        let _source_cache = self.source_cache_session();
2188
2189        let mut target_addrs = Vec::new();
2190        for (sheet, row, col) in targets {
2191            let sheet_id = self.graph.sheet_id_mut(sheet);
2192            let coord = Coord::from_excel(*row, *col, true, true);
2193            target_addrs.push(CellRef::new(sheet_id, coord));
2194        }
2195
2196        let mut target_vertex_ids = Vec::new();
2197        for addr in &target_addrs {
2198            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
2199                target_vertex_ids.push(*vertex_id);
2200            }
2201        }
2202
2203        if target_vertex_ids.is_empty() {
2204            return Ok(EvalResult {
2205                computed_vertices: 0,
2206                cycle_errors: 0,
2207                elapsed: start.elapsed(),
2208            });
2209        }
2210
2211        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
2212
2213        if precedents_to_eval.is_empty() {
2214            return Ok(EvalResult {
2215                computed_vertices: 0,
2216                cycle_errors: 0,
2217                elapsed: start.elapsed(),
2218            });
2219        }
2220
2221        let scheduler = Scheduler::new(&self.graph);
2222        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
2223
2224        let mut cycle_errors = 0;
2225        let circ_error = LiteralValue::Error(
2226            ExcelError::new(ExcelErrorKind::Circ)
2227                .with_message("Circular dependency detected".to_string()),
2228        );
2229        for cycle in &schedule.cycles {
2230            cycle_errors += 1;
2231            for &vertex_id in cycle {
2232                if delta.mode != DeltaMode::Off
2233                    && let Some(cell) = self.graph.get_cell_ref_for_vertex(vertex_id)
2234                {
2235                    let old = self
2236                        .graph
2237                        .get_value(vertex_id)
2238                        .unwrap_or(LiteralValue::Empty);
2239                    if old != circ_error {
2240                        delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
2241                    }
2242                }
2243                self.graph
2244                    .update_vertex_value(vertex_id, circ_error.clone());
2245            }
2246        }
2247
2248        let mut computed_vertices = 0;
2249        for layer in &schedule.layers {
2250            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2251                computed_vertices += self.evaluate_layer_parallel_with_delta(layer, delta)?;
2252            } else {
2253                computed_vertices += self.evaluate_layer_sequential_with_delta(layer, delta)?;
2254            }
2255        }
2256
2257        self.graph.clear_dirty_flags(&precedents_to_eval);
2258        self.graph.redirty_volatiles();
2259
2260        Ok(EvalResult {
2261            computed_vertices,
2262            cycle_errors,
2263            elapsed: start.elapsed(),
2264        })
2265    }
2266
2267    /// Build a reusable evaluation plan that covers every formula vertex in the workbook.
2268    pub fn build_recalc_plan(&self) -> Result<RecalcPlan, ExcelError> {
2269        let mut vertices: Vec<VertexId> = self.graph.vertices_with_formulas().collect();
2270        vertices.sort_unstable();
2271        if vertices.is_empty() {
2272            return Ok(RecalcPlan {
2273                schedule: crate::engine::Schedule {
2274                    layers: Vec::new(),
2275                    cycles: Vec::new(),
2276                },
2277            });
2278        }
2279
2280        let scheduler = Scheduler::new(&self.graph);
2281        let schedule = scheduler.create_schedule(&vertices)?;
2282        Ok(RecalcPlan { schedule })
2283    }
2284
2285    /// Evaluate using a previously constructed plan. This avoids rebuilding layer schedules for each run.
2286    pub fn evaluate_recalc_plan(&mut self, plan: &RecalcPlan) -> Result<EvalResult, ExcelError> {
2287        let _source_cache = self.source_cache_session();
2288        if self.config.defer_graph_building {
2289            self.build_graph_all()?;
2290        }
2291
2292        let start = web_time::Instant::now();
2293        let dirty_vertices = self.graph.get_evaluation_vertices();
2294        if dirty_vertices.is_empty() {
2295            return Ok(EvalResult {
2296                computed_vertices: 0,
2297                cycle_errors: 0,
2298                elapsed: start.elapsed(),
2299            });
2300        }
2301
2302        let dirty_set: FxHashSet<VertexId> = dirty_vertices.iter().copied().collect();
2303        let mut computed_vertices = 0;
2304        let mut cycle_errors = 0;
2305
2306        if !plan.schedule.cycles.is_empty() {
2307            let circ_error = LiteralValue::Error(
2308                ExcelError::new(ExcelErrorKind::Circ)
2309                    .with_message("Circular dependency detected".to_string()),
2310            );
2311            for cycle in &plan.schedule.cycles {
2312                if !cycle.iter().any(|v| dirty_set.contains(v)) {
2313                    continue;
2314                }
2315                cycle_errors += 1;
2316                for &vertex_id in cycle {
2317                    if dirty_set.contains(&vertex_id) {
2318                        self.graph
2319                            .update_vertex_value(vertex_id, circ_error.clone());
2320                    }
2321                }
2322            }
2323        }
2324
2325        for layer in &plan.schedule.layers {
2326            let work: Vec<VertexId> = layer
2327                .vertices
2328                .iter()
2329                .copied()
2330                .filter(|v| dirty_set.contains(v))
2331                .collect();
2332            if work.is_empty() {
2333                continue;
2334            }
2335            let temp_layer = crate::engine::scheduler::Layer { vertices: work };
2336            if self.thread_pool.is_some() && temp_layer.vertices.len() > 1 {
2337                computed_vertices += self.evaluate_layer_parallel(&temp_layer)?;
2338            } else {
2339                computed_vertices += self.evaluate_layer_sequential(&temp_layer)?;
2340            }
2341        }
2342
2343        self.graph.clear_dirty_flags(&dirty_vertices);
2344        self.graph.redirty_volatiles();
2345
2346        Ok(EvalResult {
2347            computed_vertices,
2348            cycle_errors,
2349            elapsed: start.elapsed(),
2350        })
2351    }
2352
2353    /// Evaluate all dirty/volatile vertices
2354    pub fn evaluate_all(&mut self) -> Result<EvalResult, ExcelError> {
2355        let _source_cache = self.source_cache_session();
2356        if self.config.defer_graph_building {
2357            // Build graph for all staged formulas before evaluating
2358            self.build_graph_all()?;
2359        }
2360        #[cfg(feature = "tracing")]
2361        let _span_eval = tracing::info_span!("evaluate_all").entered();
2362        let start = web_time::Instant::now();
2363        let mut computed_vertices = 0;
2364        let mut cycle_errors = 0;
2365
2366        let to_evaluate = self.graph.get_evaluation_vertices();
2367        if to_evaluate.is_empty() {
2368            return Ok(EvalResult {
2369                computed_vertices,
2370                cycle_errors,
2371                elapsed: start.elapsed(),
2372            });
2373        }
2374
2375        let scheduler = Scheduler::new(&self.graph);
2376        let schedule = scheduler.create_schedule(&to_evaluate)?;
2377
2378        // Handle cycles first by marking them with #CIRC!
2379        for cycle in &schedule.cycles {
2380            cycle_errors += 1;
2381            let circ_error = LiteralValue::Error(
2382                ExcelError::new(ExcelErrorKind::Circ)
2383                    .with_message("Circular dependency detected".to_string()),
2384            );
2385            for &vertex_id in cycle {
2386                self.graph
2387                    .update_vertex_value(vertex_id, circ_error.clone());
2388            }
2389        }
2390
2391        // Evaluate acyclic layers (parallel or sequential based on config)
2392        for layer in &schedule.layers {
2393            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2394                computed_vertices += self.evaluate_layer_parallel(layer)?;
2395            } else {
2396                computed_vertices += self.evaluate_layer_sequential(layer)?;
2397            }
2398        }
2399
2400        // Clear dirty flags for all evaluated vertices (including cycles)
2401        self.graph.clear_dirty_flags(&to_evaluate);
2402
2403        // Re-dirty volatile vertices for the next evaluation cycle
2404        self.graph.redirty_volatiles();
2405
2406        // Advance recalc epoch after a full evaluation pass finishes
2407        self.recalc_epoch = self.recalc_epoch.wrapping_add(1);
2408
2409        Ok(EvalResult {
2410            computed_vertices,
2411            cycle_errors,
2412            elapsed: start.elapsed(),
2413        })
2414    }
2415
2416    pub fn evaluate_all_with_delta(&mut self) -> Result<(EvalResult, EvalDelta), ExcelError> {
2417        let mut collector = DeltaCollector::new(DeltaMode::Cells);
2418        let result = self.evaluate_all_with_delta_collector(&mut collector)?;
2419        Ok((result, collector.finish()))
2420    }
2421
2422    fn evaluate_all_with_delta_collector(
2423        &mut self,
2424        delta: &mut DeltaCollector,
2425    ) -> Result<EvalResult, ExcelError> {
2426        let _source_cache = self.source_cache_session();
2427        if self.config.defer_graph_building {
2428            self.build_graph_all()?;
2429        }
2430        #[cfg(feature = "tracing")]
2431        let _span_eval = tracing::info_span!("evaluate_all_with_delta").entered();
2432        let start = web_time::Instant::now();
2433        let mut computed_vertices = 0;
2434        let mut cycle_errors = 0;
2435
2436        let to_evaluate = self.graph.get_evaluation_vertices();
2437        if to_evaluate.is_empty() {
2438            return Ok(EvalResult {
2439                computed_vertices,
2440                cycle_errors,
2441                elapsed: start.elapsed(),
2442            });
2443        }
2444
2445        let scheduler = Scheduler::new(&self.graph);
2446        let schedule = scheduler.create_schedule(&to_evaluate)?;
2447
2448        let circ_error = LiteralValue::Error(
2449            ExcelError::new(ExcelErrorKind::Circ)
2450                .with_message("Circular dependency detected".to_string()),
2451        );
2452        for cycle in &schedule.cycles {
2453            cycle_errors += 1;
2454            for &vertex_id in cycle {
2455                if delta.mode != DeltaMode::Off
2456                    && let Some(cell) = self.graph.get_cell_ref_for_vertex(vertex_id)
2457                {
2458                    let old = self
2459                        .graph
2460                        .get_value(vertex_id)
2461                        .unwrap_or(LiteralValue::Empty);
2462                    if old != circ_error {
2463                        delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
2464                    }
2465                }
2466                self.graph
2467                    .update_vertex_value(vertex_id, circ_error.clone());
2468            }
2469        }
2470
2471        for layer in &schedule.layers {
2472            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
2473                computed_vertices += self.evaluate_layer_parallel_with_delta(layer, delta)?;
2474            } else {
2475                computed_vertices += self.evaluate_layer_sequential_with_delta(layer, delta)?;
2476            }
2477        }
2478
2479        self.graph.clear_dirty_flags(&to_evaluate);
2480        self.graph.redirty_volatiles();
2481        self.recalc_epoch = self.recalc_epoch.wrapping_add(1);
2482
2483        Ok(EvalResult {
2484            computed_vertices,
2485            cycle_errors,
2486            elapsed: start.elapsed(),
2487        })
2488    }
2489
2490    /// Convenience: demand-driven evaluation of a single cell by sheet name and row/col.
2491    ///
2492    /// This will evaluate only the minimal set of dirty / volatile precedents required
2493    /// to bring the target cell up-to-date (as if a user asked for that single value),
2494    /// rather than scheduling a full workbook recalc. If the cell is already clean and
2495    /// non-volatile, no vertices will be recomputed.
2496    ///
2497    /// Returns the (possibly newly computed) value stored for the cell afterwards.
2498    /// Empty cells return None. Errors are surfaced via the Result type.
2499    pub fn evaluate_cell(
2500        &mut self,
2501        sheet: &str,
2502        row: u32,
2503        col: u32,
2504    ) -> Result<Option<LiteralValue>, ExcelError> {
2505        if row == 0 || col == 0 {
2506            return Err(ExcelError::new(ExcelErrorKind::Ref)
2507                .with_message("Row and column must be >= 1".to_string()));
2508        }
2509
2510        if self.config.defer_graph_building {
2511            self.build_graph_for_sheets(std::iter::once(sheet))?;
2512        }
2513
2514        let result = self.evaluate_cells(&[(sheet, row, col)])?;
2515
2516        match result.len() {
2517            0 => Ok(None),
2518            1 => {
2519                let v = result.into_iter().next().unwrap();
2520                Ok(v)
2521            }
2522            _ => unreachable!("evaluate_cells returned unexpected length"),
2523        }
2524    }
2525
2526    /// Convenience: demand-driven evaluation of multiple cells; accepts a slice of
2527    /// (sheet, row, col) triples. The union of required dirty / volatile precedents
2528    /// is computed once and evaluated, which is typically faster than calling
2529    /// `evaluate_cell` repeatedly for a related set of targets.
2530    ///
2531    /// Returns the resulting values for each requested target in the same order.
2532    pub fn evaluate_cells(
2533        &mut self,
2534        targets: &[(&str, u32, u32)],
2535    ) -> Result<Vec<Option<LiteralValue>>, ExcelError> {
2536        if targets.is_empty() {
2537            return Ok(Vec::new());
2538        }
2539        if self.config.defer_graph_building {
2540            let mut sheets: rustc_hash::FxHashSet<&str> = rustc_hash::FxHashSet::default();
2541            for (s, _, _) in targets.iter() {
2542                sheets.insert(*s);
2543            }
2544            self.build_graph_for_sheets(sheets.iter().cloned())?;
2545        }
2546        self.evaluate_until(targets)?;
2547        Ok(targets
2548            .iter()
2549            .map(|(s, r, c)| self.get_cell_value(s, *r, *c))
2550            .collect())
2551    }
2552
2553    pub fn evaluate_cells_cancellable(
2554        &mut self,
2555        targets: &[(&str, u32, u32)],
2556        cancel_flag: Arc<AtomicBool>,
2557    ) -> Result<Vec<Option<LiteralValue>>, ExcelError> {
2558        self.active_cancel_flag = Some(cancel_flag.clone());
2559        let res = self.evaluate_cells_cancellable_impl(targets, &cancel_flag);
2560        self.active_cancel_flag = None;
2561        res
2562    }
2563
2564    fn evaluate_cells_cancellable_impl(
2565        &mut self,
2566        targets: &[(&str, u32, u32)],
2567        cancel_flag: &AtomicBool,
2568    ) -> Result<Vec<Option<LiteralValue>>, ExcelError> {
2569        if targets.is_empty() {
2570            return Ok(Vec::new());
2571        }
2572        if self.config.defer_graph_building {
2573            let mut sheets: rustc_hash::FxHashSet<&str> = rustc_hash::FxHashSet::default();
2574            for (s, _, _) in targets.iter() {
2575                sheets.insert(*s);
2576            }
2577            self.build_graph_for_sheets(sheets.iter().cloned())?;
2578        }
2579
2580        // evaluate_until_cancellable takes &[&str] in A1 notation, but we have (&str, u32, u32)
2581        // Let's implement evaluate_until_coords_cancellable or similar, or just convert
2582        let a1_targets: Vec<String> = targets
2583            .iter()
2584            .map(|(s, r, c)| {
2585                format!("{}!{}", s, col_letters_from_1based(*c).unwrap()) + &r.to_string()
2586            })
2587            .collect();
2588        let a1_refs: Vec<&str> = a1_targets.iter().map(|s| s.as_str()).collect();
2589
2590        self.evaluate_until_cancellable_impl(&a1_refs, cancel_flag)?;
2591
2592        Ok(targets
2593            .iter()
2594            .map(|(s, r, c)| self.get_cell_value(s, *r, *c))
2595            .collect())
2596    }
2597
2598    pub fn evaluate_cells_with_delta(
2599        &mut self,
2600        targets: &[(&str, u32, u32)],
2601    ) -> Result<(Vec<Option<LiteralValue>>, EvalDelta), ExcelError> {
2602        if targets.is_empty() {
2603            return Ok((Vec::new(), EvalDelta::default()));
2604        }
2605        if self.config.defer_graph_building {
2606            let mut sheets: rustc_hash::FxHashSet<&str> = rustc_hash::FxHashSet::default();
2607            for (s, _, _) in targets.iter() {
2608                sheets.insert(*s);
2609            }
2610            self.build_graph_for_sheets(sheets.iter().cloned())?;
2611        }
2612        let mut collector = DeltaCollector::new(DeltaMode::Cells);
2613        self.evaluate_until_with_delta_collector(targets, &mut collector)?;
2614        let values = targets
2615            .iter()
2616            .map(|(s, r, c)| self.get_cell_value(s, *r, *c))
2617            .collect();
2618        Ok((values, collector.finish()))
2619    }
2620
2621    /// Get the evaluation plan for target cells without actually evaluating them
2622    pub fn get_eval_plan(&self, targets: &[(&str, u32, u32)]) -> Result<EvalPlan, ExcelError> {
2623        if targets.is_empty() {
2624            return Ok(EvalPlan {
2625                total_vertices_to_evaluate: 0,
2626                layers: Vec::new(),
2627                cycles_detected: 0,
2628                dirty_count: 0,
2629                volatile_count: 0,
2630                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
2631                estimated_parallel_layers: 0,
2632                target_cells: Vec::new(),
2633            });
2634        }
2635        if self.config.defer_graph_building {
2636            return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
2637                "Evaluation plan requested with deferred graph; build first or call evaluate_*",
2638            ));
2639        }
2640
2641        // Convert targets to A1 notation for consistency
2642        let addresses: Vec<String> = targets
2643            .iter()
2644            .map(|(s, r, c)| format!("{}!{}{}", s, Self::col_to_letters(*c), r))
2645            .collect();
2646
2647        // Parse target cell addresses
2648        let mut target_addrs = Vec::new();
2649        for (sheet, row, col) in targets {
2650            if let Some(sheet_id) = self.graph.sheet_id(sheet) {
2651                let coord = Coord::from_excel(*row, *col, true, true);
2652                target_addrs.push(CellRef::new(sheet_id, coord));
2653            }
2654        }
2655
2656        // Find vertex IDs for targets
2657        let mut target_vertex_ids = Vec::new();
2658        for addr in &target_addrs {
2659            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
2660                target_vertex_ids.push(*vertex_id);
2661            }
2662        }
2663
2664        if target_vertex_ids.is_empty() {
2665            return Ok(EvalPlan {
2666                total_vertices_to_evaluate: 0,
2667                layers: Vec::new(),
2668                cycles_detected: 0,
2669                dirty_count: 0,
2670                volatile_count: 0,
2671                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
2672                estimated_parallel_layers: 0,
2673                target_cells: addresses,
2674            });
2675        }
2676
2677        // Build demand subgraph with virtual edges (same as evaluate_until)
2678        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
2679
2680        if precedents_to_eval.is_empty() {
2681            return Ok(EvalPlan {
2682                total_vertices_to_evaluate: 0,
2683                layers: Vec::new(),
2684                cycles_detected: 0,
2685                dirty_count: 0,
2686                volatile_count: 0,
2687                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
2688                estimated_parallel_layers: 0,
2689                target_cells: addresses,
2690            });
2691        }
2692
2693        // Count dirty and volatile vertices
2694        let mut dirty_count = 0;
2695        let mut volatile_count = 0;
2696        for &vertex_id in &precedents_to_eval {
2697            if self.graph.is_dirty(vertex_id) {
2698                dirty_count += 1;
2699            }
2700            if self.graph.is_volatile(vertex_id) {
2701                volatile_count += 1;
2702            }
2703        }
2704
2705        // Create schedule for the minimal subgraph honoring virtual edges
2706        let scheduler = Scheduler::new(&self.graph);
2707        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
2708
2709        // Build layer information
2710        let mut layers = Vec::new();
2711        let mut estimated_parallel_layers = 0;
2712        let parallel_enabled = self.config.enable_parallel && self.thread_pool.is_some();
2713
2714        for layer in &schedule.layers {
2715            let parallel_eligible = parallel_enabled && layer.vertices.len() > 1;
2716            if parallel_eligible {
2717                estimated_parallel_layers += 1;
2718            }
2719
2720            // Get sample cell addresses (up to 5)
2721            let sample_cells: Vec<String> = layer
2722                .vertices
2723                .iter()
2724                .take(5)
2725                .filter_map(|&vertex_id| {
2726                    self.graph
2727                        .get_cell_ref_for_vertex(vertex_id)
2728                        .map(|cell_ref| {
2729                            let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
2730                            format!(
2731                                "{}!{}{}",
2732                                sheet_name,
2733                                Self::col_to_letters(cell_ref.coord.col()),
2734                                cell_ref.coord.row() + 1
2735                            )
2736                        })
2737                })
2738                .collect();
2739
2740            layers.push(LayerInfo {
2741                vertex_count: layer.vertices.len(),
2742                parallel_eligible,
2743                sample_cells,
2744            });
2745        }
2746
2747        Ok(EvalPlan {
2748            total_vertices_to_evaluate: precedents_to_eval.len(),
2749            layers,
2750            cycles_detected: schedule.cycles.len(),
2751            dirty_count,
2752            volatile_count,
2753            parallel_enabled,
2754            estimated_parallel_layers,
2755            target_cells: addresses,
2756        })
2757    }
2758
2759    /// Build a demand-driven subgraph for the given targets, including ephemeral edges for
2760    /// compressed ranges, and returning the set of dirty/volatile precedents and virtual deps.
2761    fn build_demand_subgraph(
2762        &self,
2763        target_vertices: &[VertexId],
2764    ) -> (
2765        Vec<VertexId>,
2766        rustc_hash::FxHashMap<VertexId, Vec<VertexId>>,
2767    ) {
2768        #[cfg(feature = "tracing")]
2769        let _span =
2770            tracing::info_span!("demand_subgraph", targets = target_vertices.len()).entered();
2771        use rustc_hash::{FxHashMap, FxHashSet};
2772
2773        let mut to_evaluate: FxHashSet<VertexId> = FxHashSet::default();
2774        let mut visited: FxHashSet<VertexId> = FxHashSet::default();
2775        let mut stack: Vec<VertexId> = Vec::new();
2776        let mut vdeps: FxHashMap<VertexId, Vec<VertexId>> = FxHashMap::default(); // incoming deps per vertex
2777
2778        for &t in target_vertices {
2779            stack.push(t);
2780        }
2781
2782        while let Some(v) = stack.pop() {
2783            if !visited.insert(v) {
2784                continue;
2785            }
2786            if !self.graph.vertex_exists(v) {
2787                continue;
2788            }
2789            // Only schedule dirty/volatile formulas
2790            match self.graph.get_vertex_kind(v) {
2791                VertexKind::FormulaScalar | VertexKind::FormulaArray => {
2792                    if self.graph.is_dirty(v) || self.graph.is_volatile(v) {
2793                        to_evaluate.insert(v);
2794                    }
2795                }
2796                _ => {}
2797            }
2798
2799            // Explicit dependencies (graph edges)
2800            for dep in self.graph.get_dependencies(v) {
2801                if self.graph.vertex_exists(dep) {
2802                    match self.graph.get_vertex_kind(dep) {
2803                        VertexKind::FormulaScalar | VertexKind::FormulaArray => {
2804                            if !visited.contains(&dep) {
2805                                stack.push(dep);
2806                            }
2807                        }
2808                        _ => {}
2809                    }
2810                }
2811            }
2812
2813            // Compressed range dependencies → discover formula precedents in used/bounded window
2814            if let Some(ranges) = self.graph.get_range_dependencies(v) {
2815                let current_sheet_id = self.graph.get_vertex_sheet_id(v);
2816                for r in ranges {
2817                    let sheet_id = match r.sheet {
2818                        formualizer_common::SheetLocator::Id(id) => id,
2819                        _ => current_sheet_id,
2820                    };
2821                    let sheet_name = self.graph.sheet_name(sheet_id);
2822
2823                    // Infer bounds like resolve_range_view (r bounds are 0-based)
2824                    let mut sr = r.start_row.map(|b| b.index + 1);
2825                    let mut sc = r.start_col.map(|b| b.index + 1);
2826                    let mut er = r.end_row.map(|b| b.index + 1);
2827                    let mut ec = r.end_col.map(|b| b.index + 1);
2828
2829                    if sr.is_none() && er.is_none() {
2830                        let scv = sc.unwrap_or(1);
2831                        let ecv = ec.unwrap_or(scv);
2832                        if let Some((min_r, max_r)) =
2833                            self.used_rows_for_columns(sheet_name, scv, ecv)
2834                        {
2835                            sr = Some(min_r);
2836                            er = Some(max_r);
2837                        } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2838                            sr = Some(1);
2839                            er = Some(self.config.max_open_ended_rows);
2840                        }
2841                    }
2842                    if sc.is_none() && ec.is_none() {
2843                        let srv = sr.unwrap_or(1);
2844                        let erv = er.unwrap_or(srv);
2845                        if let Some((min_c, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv)
2846                        {
2847                            sc = Some(min_c);
2848                            ec = Some(max_c);
2849                        } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2850                            sc = Some(1);
2851                            ec = Some(self.config.max_open_ended_cols);
2852                        }
2853                    }
2854                    if sr.is_some() && er.is_none() {
2855                        let scv = sc.unwrap_or(1);
2856                        let ecv = ec.unwrap_or(scv);
2857                        if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2858                            er = Some(max_r);
2859                        } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2860                            er = Some(self.config.max_open_ended_rows);
2861                        }
2862                    }
2863                    if er.is_some() && sr.is_none() {
2864                        let scv = sc.unwrap_or(1);
2865                        let ecv = ec.unwrap_or(scv);
2866                        if let Some((min_r, _)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2867                            sr = Some(min_r);
2868                        } else {
2869                            sr = Some(1);
2870                        }
2871                    }
2872                    if sc.is_some() && ec.is_none() {
2873                        let srv = sr.unwrap_or(1);
2874                        let erv = er.unwrap_or(srv);
2875                        if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2876                            ec = Some(max_c);
2877                        } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2878                            ec = Some(self.config.max_open_ended_cols);
2879                        }
2880                    }
2881                    if ec.is_some() && sc.is_none() {
2882                        let srv = sr.unwrap_or(1);
2883                        let erv = er.unwrap_or(srv);
2884                        if let Some((min_c, _)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2885                            sc = Some(min_c);
2886                        } else {
2887                            sc = Some(1);
2888                        }
2889                    }
2890
2891                    let sr = sr.unwrap_or(1);
2892                    let sc = sc.unwrap_or(1);
2893                    let er = er.unwrap_or(sr.saturating_sub(1));
2894                    let ec = ec.unwrap_or(sc.saturating_sub(1));
2895                    if er < sr || ec < sc {
2896                        continue;
2897                    }
2898
2899                    if let Some(index) = self.graph.sheet_index(sheet_id) {
2900                        let sr0 = sr.saturating_sub(1);
2901                        let er0 = er.saturating_sub(1);
2902                        let sc0 = sc.saturating_sub(1);
2903                        let ec0 = ec.saturating_sub(1);
2904                        // enumerate vertices in col range, filter row and kind
2905                        for u in index.vertices_in_col_range(sc0, ec0) {
2906                            let pc = self.graph.vertex_coord(u);
2907                            let row0 = pc.row();
2908                            if row0 < sr0 || row0 > er0 {
2909                                continue;
2910                            }
2911                            match self.graph.get_vertex_kind(u) {
2912                                VertexKind::FormulaScalar | VertexKind::FormulaArray => {
2913                                    // only link and schedule if producer is dirty/volatile
2914                                    if (self.graph.is_dirty(u) || self.graph.is_volatile(u))
2915                                        && u != v
2916                                    {
2917                                        vdeps.entry(v).or_default().push(u);
2918                                        if !visited.contains(&u) {
2919                                            stack.push(u);
2920                                        }
2921                                    }
2922                                }
2923                                _ => {}
2924                            }
2925                        }
2926                    }
2927                }
2928            }
2929        }
2930
2931        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
2932        result.sort_unstable();
2933        // Dedup virtual deps
2934        for deps in vdeps.values_mut() {
2935            deps.sort_unstable();
2936            deps.dedup();
2937        }
2938        (result, vdeps)
2939    }
2940
2941    /// Helper: convert 1-based column index to Excel-style letters (1 -> A, 27 -> AA)
2942    fn col_to_letters(col: u32) -> String {
2943        col_letters_from_1based(col).expect("column index must be >= 1")
2944    }
2945
2946    /// Evaluate all dirty/volatile vertices with cancellation support
2947    pub fn evaluate_all_cancellable(
2948        &mut self,
2949        cancel_flag: Arc<AtomicBool>,
2950    ) -> Result<EvalResult, ExcelError> {
2951        self.active_cancel_flag = Some(cancel_flag.clone());
2952        let res = self.evaluate_all_cancellable_impl(&cancel_flag);
2953        self.active_cancel_flag = None;
2954        res
2955    }
2956
2957    fn evaluate_all_cancellable_impl(
2958        &mut self,
2959        cancel_flag: &AtomicBool,
2960    ) -> Result<EvalResult, ExcelError> {
2961        let start = web_time::Instant::now();
2962        let mut computed_vertices = 0;
2963        let mut cycle_errors = 0;
2964
2965        let to_evaluate = self.graph.get_evaluation_vertices();
2966        if to_evaluate.is_empty() {
2967            return Ok(EvalResult {
2968                computed_vertices,
2969                cycle_errors,
2970                elapsed: start.elapsed(),
2971            });
2972        }
2973
2974        let scheduler = Scheduler::new(&self.graph);
2975        let schedule = scheduler.create_schedule(&to_evaluate)?;
2976
2977        // Handle cycles first by marking them with #CIRC!
2978        for cycle in &schedule.cycles {
2979            // Check cancellation between cycles
2980            if cancel_flag.load(Ordering::Relaxed) {
2981                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
2982                    .with_message("Evaluation cancelled during cycle handling".to_string()));
2983            }
2984
2985            cycle_errors += 1;
2986            let circ_error = LiteralValue::Error(
2987                ExcelError::new(ExcelErrorKind::Circ)
2988                    .with_message("Circular dependency detected".to_string()),
2989            );
2990            for &vertex_id in cycle {
2991                self.graph
2992                    .update_vertex_value(vertex_id, circ_error.clone());
2993            }
2994        }
2995
2996        // Evaluate acyclic layers sequentially with cancellation checks
2997        for layer in &schedule.layers {
2998            // Check cancellation between layers
2999            if cancel_flag.load(Ordering::Relaxed) {
3000                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
3001                    .with_message("Evaluation cancelled between layers".to_string()));
3002            }
3003
3004            // Evaluate vertices in this layer (parallel or sequential)
3005            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
3006                computed_vertices +=
3007                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
3008            } else {
3009                computed_vertices +=
3010                    self.evaluate_layer_sequential_cancellable(layer, cancel_flag)?;
3011            }
3012        }
3013
3014        // Clear dirty flags for all evaluated vertices (including cycles)
3015        self.graph.clear_dirty_flags(&to_evaluate);
3016
3017        // Re-dirty volatile vertices for the next evaluation cycle
3018        self.graph.redirty_volatiles();
3019
3020        Ok(EvalResult {
3021            computed_vertices,
3022            cycle_errors,
3023            elapsed: start.elapsed(),
3024        })
3025    }
3026
3027    /// Evaluate only the necessary precedents for specific target cells with cancellation support
3028    pub fn evaluate_until_cancellable(
3029        &mut self,
3030        targets: &[&str],
3031        cancel_flag: Arc<AtomicBool>,
3032    ) -> Result<EvalResult, ExcelError> {
3033        self.active_cancel_flag = Some(cancel_flag.clone());
3034        let res = self.evaluate_until_cancellable_impl(targets, &cancel_flag);
3035        self.active_cancel_flag = None;
3036        res
3037    }
3038
3039    fn evaluate_until_cancellable_impl(
3040        &mut self,
3041        targets: &[&str],
3042        cancel_flag: &AtomicBool,
3043    ) -> Result<EvalResult, ExcelError> {
3044        let start = web_time::Instant::now();
3045
3046        // Parse target cell addresses
3047        let mut target_addrs = Vec::new();
3048        for target in targets {
3049            let (sheet, row, col) = self.parse_a1_notation(target)?;
3050            let sheet_id = self.graph.sheet_id_mut(&sheet);
3051            let coord = Coord::from_excel(row, col, true, true);
3052            target_addrs.push(CellRef::new(sheet_id, coord));
3053        }
3054
3055        // Find vertex IDs for targets
3056        let mut target_vertex_ids = Vec::new();
3057        for addr in &target_addrs {
3058            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
3059                target_vertex_ids.push(*vertex_id);
3060            }
3061        }
3062
3063        if target_vertex_ids.is_empty() {
3064            return Ok(EvalResult {
3065                computed_vertices: 0,
3066                cycle_errors: 0,
3067                elapsed: start.elapsed(),
3068            });
3069        }
3070
3071        // Build demand subgraph with virtual edges
3072        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
3073
3074        if precedents_to_eval.is_empty() {
3075            return Ok(EvalResult {
3076                computed_vertices: 0,
3077                cycle_errors: 0,
3078                elapsed: start.elapsed(),
3079            });
3080        }
3081
3082        // Create schedule honoring virtual edges
3083        let scheduler = Scheduler::new(&self.graph);
3084        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
3085
3086        // Handle cycles first
3087        let mut cycle_errors = 0;
3088        for cycle in &schedule.cycles {
3089            // Check cancellation between cycles
3090            if cancel_flag.load(Ordering::Relaxed) {
3091                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
3092                    "Demand-driven evaluation cancelled during cycle handling".to_string(),
3093                ));
3094            }
3095
3096            cycle_errors += 1;
3097            let circ_error = LiteralValue::Error(
3098                ExcelError::new(ExcelErrorKind::Circ)
3099                    .with_message("Circular dependency detected".to_string()),
3100            );
3101            for &vertex_id in cycle {
3102                self.graph
3103                    .update_vertex_value(vertex_id, circ_error.clone());
3104            }
3105        }
3106
3107        // Evaluate layers with cancellation checks
3108        let mut computed_vertices = 0;
3109        for layer in &schedule.layers {
3110            // Check cancellation between layers
3111            if cancel_flag.load(Ordering::Relaxed) {
3112                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
3113                    "Demand-driven evaluation cancelled between layers".to_string(),
3114                ));
3115            }
3116
3117            // Evaluate vertices in this layer (parallel or sequential)
3118            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
3119                computed_vertices +=
3120                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
3121            } else {
3122                computed_vertices +=
3123                    self.evaluate_layer_sequential_cancellable_demand_driven(layer, cancel_flag)?;
3124            }
3125        }
3126
3127        // Clear dirty flags for evaluated vertices
3128        self.graph.clear_dirty_flags(&precedents_to_eval);
3129
3130        // Re-dirty volatile vertices
3131        self.graph.redirty_volatiles();
3132
3133        Ok(EvalResult {
3134            computed_vertices,
3135            cycle_errors,
3136            elapsed: start.elapsed(),
3137        })
3138    }
3139
3140    fn parse_a1_notation(&self, address: &str) -> Result<(String, u32, u32), ExcelError> {
3141        let mut parts = address.splitn(2, '!');
3142        let first = parts.next().unwrap_or_default();
3143        let remainder = parts.next();
3144
3145        let (sheet, cell_part) = match remainder {
3146            Some(cell) => (first.to_string(), cell),
3147            None => (self.default_sheet_name().to_string(), first),
3148        };
3149
3150        let (row, col, _, _) = parse_a1_1based(cell_part).map_err(|err| {
3151            ExcelError::new(ExcelErrorKind::Ref)
3152                .with_message(format!("Invalid cell reference `{cell_part}`: {err}"))
3153        })?;
3154
3155        Ok((sheet, row, col))
3156    }
3157
3158    /// Determine volatility using this engine's FunctionProvider, falling back to global registry.
3159    fn is_ast_volatile_with_provider(&self, ast: &ASTNode) -> bool {
3160        use formualizer_parse::parser::ASTNodeType;
3161        match &ast.node_type {
3162            ASTNodeType::Function { name, args, .. } => {
3163                if let Some(func) = self
3164                    .get_function("", name)
3165                    .or_else(|| crate::function_registry::get("", name))
3166                    && func.caps().contains(crate::function::FnCaps::VOLATILE)
3167                {
3168                    return true;
3169                }
3170                args.iter()
3171                    .any(|arg| self.is_ast_volatile_with_provider(arg))
3172            }
3173            ASTNodeType::BinaryOp { left, right, .. } => {
3174                self.is_ast_volatile_with_provider(left)
3175                    || self.is_ast_volatile_with_provider(right)
3176            }
3177            ASTNodeType::UnaryOp { expr, .. } => self.is_ast_volatile_with_provider(expr),
3178            ASTNodeType::Array(rows) => rows.iter().any(|row| {
3179                row.iter()
3180                    .any(|cell| self.is_ast_volatile_with_provider(cell))
3181            }),
3182            _ => false,
3183        }
3184    }
3185
3186    /// Find dirty precedents that need evaluation for the given target vertices
3187    fn find_dirty_precedents(&self, target_vertices: &[VertexId]) -> Vec<VertexId> {
3188        let mut to_evaluate = FxHashSet::default();
3189        let mut visited = FxHashSet::default();
3190        let mut stack = Vec::new();
3191
3192        // Start reverse traversal from target vertices
3193        for &target in target_vertices {
3194            stack.push(target);
3195        }
3196
3197        while let Some(vertex_id) = stack.pop() {
3198            if !visited.insert(vertex_id) {
3199                continue; // Already processed
3200            }
3201
3202            if self.graph.vertex_exists(vertex_id) {
3203                // Check if this vertex needs evaluation
3204                let kind = self.graph.get_vertex_kind(vertex_id);
3205                let needs_eval = match kind {
3206                    super::vertex::VertexKind::FormulaScalar
3207                    | super::vertex::VertexKind::FormulaArray => {
3208                        self.graph.is_dirty(vertex_id) || self.graph.is_volatile(vertex_id)
3209                    }
3210                    _ => false, // Values and empty cells don't need evaluation
3211                };
3212
3213                if needs_eval {
3214                    to_evaluate.insert(vertex_id);
3215                }
3216
3217                // Continue traversal to dependencies (precedents)
3218                let dependencies = self.graph.get_dependencies(vertex_id);
3219                for &dep_id in &dependencies {
3220                    if !visited.contains(&dep_id) {
3221                        stack.push(dep_id);
3222                    }
3223                }
3224            }
3225        }
3226
3227        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
3228        result.sort_unstable();
3229        result
3230    }
3231
3232    /// Evaluate a layer sequentially
3233    fn evaluate_layer_sequential(
3234        &mut self,
3235        layer: &super::scheduler::Layer,
3236    ) -> Result<usize, ExcelError> {
3237        for &vertex_id in &layer.vertices {
3238            self.evaluate_vertex(vertex_id)?;
3239        }
3240        Ok(layer.vertices.len())
3241    }
3242
3243    fn update_vertex_value_with_delta(
3244        &mut self,
3245        vertex_id: VertexId,
3246        new_value: LiteralValue,
3247        delta: &mut DeltaCollector,
3248    ) {
3249        if delta.mode != DeltaMode::Off
3250            && let Some(cell) = self.graph.get_cell_ref_for_vertex(vertex_id)
3251        {
3252            let old = self
3253                .graph
3254                .get_value(vertex_id)
3255                .unwrap_or(LiteralValue::Empty);
3256            if old != new_value {
3257                delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
3258            }
3259        }
3260        self.graph.update_vertex_value(vertex_id, new_value.clone());
3261        self.mirror_vertex_value_to_overlay(vertex_id, &new_value);
3262    }
3263
3264    fn evaluate_layer_sequential_with_delta(
3265        &mut self,
3266        layer: &super::scheduler::Layer,
3267        delta: &mut DeltaCollector,
3268    ) -> Result<usize, ExcelError> {
3269        for &vertex_id in &layer.vertices {
3270            self.evaluate_vertex_impl(vertex_id, Some(delta))?;
3271        }
3272        Ok(layer.vertices.len())
3273    }
3274
3275    /// Evaluate a layer sequentially with cancellation support
3276    fn evaluate_layer_sequential_cancellable(
3277        &mut self,
3278        layer: &super::scheduler::Layer,
3279        cancel_flag: &AtomicBool,
3280    ) -> Result<usize, ExcelError> {
3281        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
3282            // Check cancellation every 256 vertices to balance responsiveness with performance
3283            if i % 256 == 0 && cancel_flag.load(Ordering::Relaxed) {
3284                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
3285                    .with_message("Evaluation cancelled within layer".to_string()));
3286            }
3287
3288            self.evaluate_vertex(vertex_id)?;
3289        }
3290        Ok(layer.vertices.len())
3291    }
3292
3293    /// Evaluate a layer sequentially with more frequent cancellation checks for demand-driven evaluation
3294    fn evaluate_layer_sequential_cancellable_demand_driven(
3295        &mut self,
3296        layer: &super::scheduler::Layer,
3297        cancel_flag: &AtomicBool,
3298    ) -> Result<usize, ExcelError> {
3299        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
3300            // Check cancellation more frequently for demand-driven evaluation (every 128 vertices)
3301            if i % 128 == 0 && cancel_flag.load(Ordering::Relaxed) {
3302                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
3303                    .with_message("Demand-driven evaluation cancelled within layer".to_string()));
3304            }
3305
3306            self.evaluate_vertex(vertex_id)?;
3307        }
3308        Ok(layer.vertices.len())
3309    }
3310
3311    /// Evaluate a layer in parallel using the thread pool
3312    fn evaluate_layer_parallel(
3313        &mut self,
3314        layer: &super::scheduler::Layer,
3315    ) -> Result<usize, ExcelError> {
3316        use rayon::prelude::*;
3317
3318        let thread_pool = self.thread_pool.as_ref().unwrap();
3319
3320        // Collect all evaluation results first, then update the graph sequentially
3321        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
3322            thread_pool.install(|| {
3323                layer
3324                    .vertices
3325                    .par_iter()
3326                    .map(
3327                        |&vertex_id| match self.evaluate_vertex_immutable(vertex_id) {
3328                            Ok(v) => Ok((vertex_id, v)),
3329                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
3330                        },
3331                    )
3332                    .collect()
3333            });
3334
3335        // Update the graph with results sequentially (thread-safe)
3336        match results {
3337            Ok(vertex_results) => {
3338                for (vertex_id, result) in vertex_results {
3339                    self.graph.update_vertex_value(vertex_id, result.clone());
3340                    self.mirror_vertex_value_to_overlay(vertex_id, &result);
3341                }
3342                Ok(layer.vertices.len())
3343            }
3344            Err(e) => Err(e),
3345        }
3346    }
3347
3348    fn evaluate_layer_parallel_with_delta(
3349        &mut self,
3350        layer: &super::scheduler::Layer,
3351        delta: &mut DeltaCollector,
3352    ) -> Result<usize, ExcelError> {
3353        use rayon::prelude::*;
3354
3355        let thread_pool = self.thread_pool.as_ref().unwrap();
3356
3357        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
3358            thread_pool.install(|| {
3359                layer
3360                    .vertices
3361                    .par_iter()
3362                    .map(
3363                        |&vertex_id| match self.evaluate_vertex_immutable(vertex_id) {
3364                            Ok(v) => Ok((vertex_id, v)),
3365                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
3366                        },
3367                    )
3368                    .collect()
3369            });
3370
3371        match results {
3372            Ok(vertex_results) => {
3373                for (vertex_id, result) in vertex_results {
3374                    self.update_vertex_value_with_delta(vertex_id, result, delta);
3375                }
3376                Ok(layer.vertices.len())
3377            }
3378            Err(e) => Err(e),
3379        }
3380    }
3381
3382    /// Evaluate a layer in parallel with cancellation support
3383    fn evaluate_layer_parallel_cancellable(
3384        &mut self,
3385        layer: &super::scheduler::Layer,
3386        cancel_flag: &AtomicBool,
3387    ) -> Result<usize, ExcelError> {
3388        use rayon::prelude::*;
3389
3390        let thread_pool = self.thread_pool.as_ref().unwrap();
3391
3392        // Check cancellation before starting parallel work
3393        if cancel_flag.load(Ordering::Relaxed) {
3394            return Err(ExcelError::new(ExcelErrorKind::Cancelled)
3395                .with_message("Parallel evaluation cancelled before starting".to_string()));
3396        }
3397
3398        // Collect all evaluation results first, then update the graph sequentially
3399        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
3400            thread_pool.install(|| {
3401                layer
3402                    .vertices
3403                    .par_iter()
3404                    .map(|&vertex_id| {
3405                        // Check cancellation periodically during parallel work
3406                        if cancel_flag.load(Ordering::Relaxed) {
3407                            return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
3408                                "Parallel evaluation cancelled during execution".to_string(),
3409                            ));
3410                        }
3411                        match self.evaluate_vertex_immutable(vertex_id) {
3412                            Ok(v) => Ok((vertex_id, v)),
3413                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
3414                        }
3415                    })
3416                    .collect()
3417            });
3418
3419        // Update the graph with results sequentially (thread-safe)
3420        match results {
3421            Ok(vertex_results) => {
3422                for (vertex_id, result) in vertex_results {
3423                    self.graph.update_vertex_value(vertex_id, result.clone());
3424                    self.mirror_vertex_value_to_overlay(vertex_id, &result);
3425                }
3426                Ok(layer.vertices.len())
3427            }
3428            Err(e) => Err(e),
3429        }
3430    }
3431
3432    /// Evaluate a single vertex without mutating the graph (for parallel evaluation)
3433    fn evaluate_vertex_immutable(&self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
3434        // Check if vertex exists
3435        if !self.graph.vertex_exists(vertex_id) {
3436            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
3437                .with_message(format!("Vertex not found: {vertex_id:?}")));
3438        }
3439
3440        // Get vertex kind and check if it needs evaluation
3441        let kind = self.graph.get_vertex_kind(vertex_id);
3442        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
3443
3444        let ast_id = match kind {
3445            VertexKind::FormulaScalar => {
3446                if let Some(ast_id) = self.graph.get_formula_id(vertex_id) {
3447                    ast_id
3448                } else {
3449                    return Ok(LiteralValue::Int(0));
3450                }
3451            }
3452            VertexKind::Empty | VertexKind::Cell => {
3453                // Check if there's a value stored
3454                if let Some(value) = self.graph.get_value(vertex_id) {
3455                    return Ok(value.clone());
3456                } else {
3457                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
3458                }
3459            }
3460            VertexKind::NamedScalar => {
3461                let named_range = self.graph.named_range_by_vertex(vertex_id).ok_or_else(|| {
3462                    ExcelError::new(ExcelErrorKind::Name)
3463                        .with_message("Named range metadata missing".to_string())
3464                })?;
3465
3466                return match &named_range.definition {
3467                    NamedDefinition::Cell(cell_ref) => {
3468                        if let Some(dep_vertex) = self.graph.get_vertex_for_cell(cell_ref)
3469                            && let Some(existing) = self.graph.get_value(dep_vertex)
3470                        {
3471                            return Ok(existing.clone());
3472                        }
3473                        let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
3474                        Ok(self
3475                            .graph
3476                            .get_cell_value(sheet_name, cell_ref.coord.row(), cell_ref.coord.col())
3477                            .unwrap_or(LiteralValue::Empty))
3478                    }
3479                    NamedDefinition::Formula { ast, .. } => {
3480                        let context_sheet = match named_range.scope {
3481                            NameScope::Sheet(id) => id,
3482                            NameScope::Workbook => sheet_id,
3483                        };
3484                        let sheet_name = self.graph.sheet_name(context_sheet);
3485                        let cell_ref = self
3486                            .graph
3487                            .get_cell_ref(vertex_id)
3488                            .unwrap_or_else(|| self.graph.make_cell_ref(sheet_name, 0, 0));
3489                        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
3490                        interpreter.evaluate_ast(ast).map(|cv| cv.into_literal())
3491                    }
3492                    NamedDefinition::Range(_) => Err(ExcelError::new(ExcelErrorKind::NImpl)
3493                        .with_message("Array named ranges not yet implemented".to_string())),
3494                };
3495            }
3496            _ => {
3497                return Ok(LiteralValue::Error(
3498                    ExcelError::new(formualizer_common::ExcelErrorKind::Na)
3499                        .with_message("Array formulas not yet supported".to_string()),
3500                ));
3501            }
3502        };
3503
3504        // The interpreter uses a reference to the engine as the context
3505        let sheet_name = self.graph.sheet_name(sheet_id);
3506        let cell_ref = self
3507            .graph
3508            .get_cell_ref(vertex_id)
3509            .expect("cell ref for vertex");
3510        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
3511
3512        interpreter
3513            .evaluate_arena_ast(ast_id, self.graph.data_store(), self.graph.sheet_reg())
3514            .map(|cv| cv.into_literal())
3515    }
3516
3517    /// Get access to the shared thread pool for parallel evaluation
3518    pub fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
3519        self.thread_pool.as_ref()
3520    }
3521}
3522
3523#[derive(Default)]
3524struct RowBoundsCache {
3525    snapshot: u64,
3526    // key: (sheet_id, col_idx)
3527    map: rustc_hash::FxHashMap<(u32, usize), (Option<u32>, Option<u32>)>,
3528}
3529
3530impl RowBoundsCache {
3531    fn new(snapshot: u64) -> Self {
3532        Self {
3533            snapshot,
3534            map: Default::default(),
3535        }
3536    }
3537    fn get_row_bounds(
3538        &self,
3539        sheet_id: SheetId,
3540        col_idx: usize,
3541        snapshot: u64,
3542    ) -> Option<(Option<u32>, Option<u32>)> {
3543        if self.snapshot != snapshot {
3544            return None;
3545        }
3546        self.map.get(&(sheet_id as u32, col_idx)).copied()
3547    }
3548    fn put_row_bounds(
3549        &mut self,
3550        sheet_id: SheetId,
3551        col_idx: usize,
3552        snapshot: u64,
3553        bounds: (Option<u32>, Option<u32>),
3554    ) {
3555        if self.snapshot != snapshot {
3556            self.snapshot = snapshot;
3557            self.map.clear();
3558        }
3559        self.map.insert((sheet_id as u32, col_idx), bounds);
3560    }
3561}
3562
3563// Phase 2 shim: in-process spill manager delegating to current graph methods.
3564#[derive(Default)]
3565pub struct ShimSpillManager {
3566    region_locks: RegionLockManager,
3567    pub(crate) active_locks: rustc_hash::FxHashMap<VertexId, u64>,
3568}
3569
3570impl ShimSpillManager {
3571    pub(crate) fn reserve(
3572        &mut self,
3573        owner: VertexId,
3574        anchor_cell: CellRef,
3575        shape: SpillShape,
3576        _meta: SpillMeta,
3577    ) -> Result<(), ExcelError> {
3578        // Derive region from anchor + shape; enforce in-flight exclusivity only.
3579        let region = crate::engine::spill::Region {
3580            sheet_id: anchor_cell.sheet_id as u32,
3581            row_start: anchor_cell.coord.row(),
3582            row_end: anchor_cell
3583                .coord
3584                .row()
3585                .saturating_add(shape.rows)
3586                .saturating_sub(1),
3587            col_start: anchor_cell.coord.col(),
3588            col_end: anchor_cell
3589                .coord
3590                .col()
3591                .saturating_add(shape.cols)
3592                .saturating_sub(1),
3593        };
3594        match self.region_locks.reserve(region, owner) {
3595            Ok(id) => {
3596                if id != 0 {
3597                    self.active_locks.insert(owner, id);
3598                }
3599                Ok(())
3600            }
3601            Err(e) => Err(e),
3602        }
3603    }
3604
3605    pub(crate) fn commit_array(
3606        &mut self,
3607        graph: &mut DependencyGraph,
3608        anchor_vertex: VertexId,
3609        targets: &[CellRef],
3610        rows: Vec<Vec<LiteralValue>>,
3611    ) -> Result<(), ExcelError> {
3612        // Re-run plan on concrete targets before committing to respect blockers.
3613        let plan_res = graph.plan_spill_region(anchor_vertex, targets);
3614        if let Err(e) = plan_res {
3615            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
3616                self.region_locks.release(id);
3617            }
3618            return Err(e);
3619        }
3620
3621        let commit_res = graph.commit_spill_region_atomic_with_fault(
3622            anchor_vertex,
3623            targets.to_vec(),
3624            rows,
3625            None,
3626        );
3627        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
3628            self.region_locks.release(id);
3629        }
3630        commit_res.map(|_| ())
3631    }
3632
3633    /// Commit a spill and mirror all written cells into Arrow overlay via the owning engine.
3634    pub(crate) fn commit_array_with_overlay<R: EvaluationContext>(
3635        &mut self,
3636        engine: &mut Engine<R>,
3637        anchor_vertex: VertexId,
3638        targets: &[CellRef],
3639        rows: Vec<Vec<LiteralValue>>,
3640    ) -> Result<(), ExcelError> {
3641        // Re-run plan on concrete targets before committing to respect blockers.
3642        let plan_res = engine.graph.plan_spill_region(anchor_vertex, targets);
3643        if let Err(e) = plan_res {
3644            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
3645                self.region_locks.release(id);
3646            }
3647            return Err(e);
3648        }
3649
3650        let commit_res = engine.graph.commit_spill_region_atomic_with_fault(
3651            anchor_vertex,
3652            targets.to_vec(),
3653            rows.clone(),
3654            None,
3655        );
3656        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
3657            self.region_locks.release(id);
3658        }
3659        commit_res.map(|_| ())?;
3660
3661        // Mirror into Arrow overlay when enabled
3662        if engine.config.arrow_storage_enabled
3663            && engine.config.delta_overlay_enabled
3664            && engine.config.write_formula_overlay_enabled
3665        {
3666            // Expect targets to be a contiguous rectangle row-major starting at some anchor
3667            for (idx, cell) in targets.iter().enumerate() {
3668                let (r_off, c_off) = {
3669                    if rows.is_empty() || rows[0].is_empty() {
3670                        (0usize, 0usize)
3671                    } else {
3672                        let width = rows[0].len();
3673                        (idx / width, idx % width)
3674                    }
3675                };
3676                let v = rows
3677                    .get(r_off)
3678                    .and_then(|r| r.get(c_off))
3679                    .cloned()
3680                    .unwrap_or(LiteralValue::Empty);
3681                let sheet_name = engine.graph.sheet_name(cell.sheet_id).to_string();
3682                engine.mirror_value_to_overlay(
3683                    &sheet_name,
3684                    cell.coord.row() + 1,
3685                    cell.coord.col() + 1,
3686                    &v,
3687                );
3688            }
3689        }
3690        Ok(())
3691    }
3692}
3693
3694impl<R> Engine<R>
3695where
3696    R: EvaluationContext,
3697{
3698    fn resolve_shared_ref(
3699        &self,
3700        reference: &ReferenceType,
3701        current_sheet: &str,
3702    ) -> Result<formualizer_common::SheetRef<'static>, ExcelError> {
3703        use formualizer_common::{
3704            SheetCellRef as SharedCellRef, SheetLocator, SheetRangeRef as SharedRangeRef,
3705            SheetRef as SharedRef,
3706        };
3707
3708        // Preserve anchor flags from the parsed reference when possible.
3709        let sr = match reference {
3710            ReferenceType::Cell {
3711                sheet,
3712                row,
3713                col,
3714                row_abs,
3715                col_abs,
3716            } => {
3717                let row0 = row
3718                    .checked_sub(1)
3719                    .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
3720                let col0 = col
3721                    .checked_sub(1)
3722                    .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
3723                let sheet_loc = match sheet.as_deref() {
3724                    Some(name) => SheetLocator::from_name(name),
3725                    None => SheetLocator::Current,
3726                };
3727                let coord = formualizer_common::RelativeCoord::new(row0, col0, *row_abs, *col_abs);
3728                SharedRef::Cell(SharedCellRef::new(sheet_loc, coord))
3729            }
3730            ReferenceType::Range {
3731                sheet,
3732                start_row,
3733                start_col,
3734                end_row,
3735                end_col,
3736                start_row_abs,
3737                start_col_abs,
3738                end_row_abs,
3739                end_col_abs,
3740            } => {
3741                let sheet_loc = match sheet.as_deref() {
3742                    Some(name) => SheetLocator::from_name(name),
3743                    None => SheetLocator::Current,
3744                };
3745                let sr = start_row
3746                    .map(|r| {
3747                        r.checked_sub(1)
3748                            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
3749                    })
3750                    .transpose()?;
3751                let sc = start_col
3752                    .map(|c| {
3753                        c.checked_sub(1)
3754                            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
3755                    })
3756                    .transpose()?;
3757                let er = end_row
3758                    .map(|r| {
3759                        r.checked_sub(1)
3760                            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
3761                    })
3762                    .transpose()?;
3763                let ec = end_col
3764                    .map(|c| {
3765                        c.checked_sub(1)
3766                            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
3767                    })
3768                    .transpose()?;
3769                let range = SharedRangeRef::from_parts(
3770                    sheet_loc,
3771                    sr.map(|idx| formualizer_common::AxisBound::new(idx, *start_row_abs)),
3772                    sc.map(|idx| formualizer_common::AxisBound::new(idx, *start_col_abs)),
3773                    er.map(|idx| formualizer_common::AxisBound::new(idx, *end_row_abs)),
3774                    ec.map(|idx| formualizer_common::AxisBound::new(idx, *end_col_abs)),
3775                )
3776                .map_err(|_| ExcelError::new(ExcelErrorKind::Ref))?;
3777                SharedRef::Range(range)
3778            }
3779            _ => return Err(ExcelError::new(ExcelErrorKind::Ref)),
3780        };
3781
3782        let current_id = self
3783            .graph
3784            .sheet_id(current_sheet)
3785            .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))?;
3786
3787        let resolve_loc = |loc: SheetLocator<'_>| -> Result<SheetLocator<'static>, ExcelError> {
3788            match loc {
3789                SheetLocator::Current => Ok(SheetLocator::Id(current_id)),
3790                SheetLocator::Id(id) => Ok(SheetLocator::Id(id)),
3791                SheetLocator::Name(name) => {
3792                    let n = name.as_ref();
3793                    self.graph
3794                        .sheet_id(n)
3795                        .map(SheetLocator::Id)
3796                        .ok_or_else(|| ExcelError::new(ExcelErrorKind::Ref))
3797                }
3798            }
3799        };
3800
3801        match sr {
3802            SharedRef::Cell(cell) => {
3803                let owned = cell.into_owned();
3804                let sheet = resolve_loc(owned.sheet)?;
3805                Ok(SharedRef::Cell(SharedCellRef::new(sheet, owned.coord)))
3806            }
3807            SharedRef::Range(range) => {
3808                let owned = range.into_owned();
3809                let sheet = resolve_loc(owned.sheet)?;
3810                Ok(SharedRef::Range(SharedRangeRef {
3811                    sheet,
3812                    start_row: owned.start_row,
3813                    start_col: owned.start_col,
3814                    end_row: owned.end_row,
3815                    end_col: owned.end_col,
3816                }))
3817            }
3818        }
3819    }
3820}
3821
3822// Implement the resolver traits for the Engine.
3823// This allows the interpreter to resolve references by querying the engine's graph.
3824impl<R> crate::traits::ReferenceResolver for Engine<R>
3825where
3826    R: EvaluationContext,
3827{
3828    fn resolve_cell_reference(
3829        &self,
3830        sheet: Option<&str>,
3831        row: u32,
3832        col: u32,
3833    ) -> Result<LiteralValue, ExcelError> {
3834        let sheet_name = sheet.unwrap_or_else(|| self.default_sheet_name()); // FIXME: should use formula current-sheet context
3835        // Prefer engine's unified accessor which consults Arrow store for base values
3836        // and falls back to graph for formulas and stored values.
3837        if let Some(v) = self.get_cell_value(sheet_name, row, col) {
3838            Ok(v)
3839        } else {
3840            // Excel semantics: empty cell coerces to 0 in numeric contexts
3841            Ok(LiteralValue::Int(0))
3842        }
3843    }
3844}
3845
3846impl<R> crate::traits::RangeResolver for Engine<R>
3847where
3848    R: EvaluationContext,
3849{
3850    fn resolve_range_reference(
3851        &self,
3852        sheet: Option<&str>,
3853        sr: Option<u32>,
3854        sc: Option<u32>,
3855        er: Option<u32>,
3856        ec: Option<u32>,
3857    ) -> Result<Box<dyn crate::traits::Range>, ExcelError> {
3858        // For now, delegate range resolution to the external resolver.
3859        // A future optimization could be to handle this within the graph.
3860        self.resolver.resolve_range_reference(sheet, sr, sc, er, ec)
3861    }
3862}
3863
3864impl<R> crate::traits::NamedRangeResolver for Engine<R>
3865where
3866    R: EvaluationContext,
3867{
3868    fn resolve_named_range_reference(
3869        &self,
3870        name: &str,
3871    ) -> Result<Vec<Vec<LiteralValue>>, ExcelError> {
3872        self.resolver.resolve_named_range_reference(name)
3873    }
3874}
3875
3876impl<R> crate::traits::TableResolver for Engine<R>
3877where
3878    R: EvaluationContext,
3879{
3880    fn resolve_table_reference(
3881        &self,
3882        tref: &formualizer_parse::parser::TableReference,
3883    ) -> Result<Box<dyn crate::traits::Table>, ExcelError> {
3884        self.resolver.resolve_table_reference(tref)
3885    }
3886}
3887
3888impl<R> crate::traits::SourceResolver for Engine<R>
3889where
3890    R: EvaluationContext,
3891{
3892    fn source_scalar_version(&self, name: &str) -> Option<u64> {
3893        self.resolver.source_scalar_version(name)
3894    }
3895
3896    fn resolve_source_scalar(&self, name: &str) -> Result<LiteralValue, ExcelError> {
3897        self.resolver.resolve_source_scalar(name)
3898    }
3899
3900    fn source_table_version(&self, name: &str) -> Option<u64> {
3901        self.resolver.source_table_version(name)
3902    }
3903
3904    fn resolve_source_table(
3905        &self,
3906        name: &str,
3907    ) -> Result<Box<dyn crate::traits::Table>, ExcelError> {
3908        self.resolver.resolve_source_table(name)
3909    }
3910}
3911
3912// The Engine is a Resolver because it implements the constituent traits.
3913impl<R> crate::traits::Resolver for Engine<R> where R: EvaluationContext {}
3914
3915// The Engine provides functions by delegating to its internal resolver.
3916impl<R> crate::traits::FunctionProvider for Engine<R>
3917where
3918    R: EvaluationContext,
3919{
3920    fn get_function(
3921        &self,
3922        prefix: &str,
3923        name: &str,
3924    ) -> Option<std::sync::Arc<dyn crate::function::Function>> {
3925        self.resolver.get_function(prefix, name)
3926    }
3927}
3928
3929// Override EvaluationContext to provide thread pool access
3930impl<R> crate::traits::EvaluationContext for Engine<R>
3931where
3932    R: EvaluationContext,
3933{
3934    fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
3935        self.thread_pool.as_ref()
3936    }
3937
3938    fn cancellation_token(&self) -> Option<Arc<std::sync::atomic::AtomicBool>> {
3939        self.active_cancel_flag.clone()
3940    }
3941
3942    fn chunk_hint(&self) -> Option<usize> {
3943        // Use a simple heuristic from configuration (stripe width * height) as a default hint.
3944        let hint =
3945            (self.config.stripe_height as usize).saturating_mul(self.config.stripe_width as usize);
3946        Some(hint.clamp(1024, 1 << 20)) // clamp between 1K and ~1M
3947    }
3948
3949    fn volatile_level(&self) -> crate::traits::VolatileLevel {
3950        self.config.volatile_level
3951    }
3952
3953    fn workbook_seed(&self) -> u64 {
3954        self.config.workbook_seed
3955    }
3956
3957    fn recalc_epoch(&self) -> u64 {
3958        self.recalc_epoch
3959    }
3960
3961    fn used_rows_for_columns(
3962        &self,
3963        sheet: &str,
3964        start_col: u32,
3965        end_col: u32,
3966    ) -> Option<(u32, u32)> {
3967        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
3968        let sheet_id = self.graph.sheet_id(sheet)?;
3969        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
3970        if arrow_ok && let Some(bounds) = self.arrow_used_row_bounds(sheet, start_col, end_col) {
3971            return Some(bounds);
3972        }
3973        let sc0 = start_col.saturating_sub(1);
3974        let ec0 = end_col.saturating_sub(1);
3975        self.graph
3976            .used_row_bounds_for_columns(sheet_id, sc0, ec0)
3977            .map(|(a0, b0)| (a0 + 1, b0 + 1))
3978    }
3979
3980    fn used_cols_for_rows(&self, sheet: &str, start_row: u32, end_row: u32) -> Option<(u32, u32)> {
3981        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
3982        let sheet_id = self.graph.sheet_id(sheet)?;
3983        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
3984        if arrow_ok && let Some(bounds) = self.arrow_used_col_bounds(sheet, start_row, end_row) {
3985            return Some(bounds);
3986        }
3987        let sr0 = start_row.saturating_sub(1);
3988        let er0 = end_row.saturating_sub(1);
3989        self.graph
3990            .used_col_bounds_for_rows(sheet_id, sr0, er0)
3991            .map(|(a0, b0)| (a0 + 1, b0 + 1))
3992    }
3993
3994    fn sheet_bounds(&self, sheet: &str) -> Option<(u32, u32)> {
3995        let _ = self.graph.sheet_id(sheet)?;
3996        // Excel-like upper bounds; we expose something finite but large.
3997        // Backends may override with real bounds.
3998        Some((1_048_576, 16_384)) // 1048576 rows, 16384 cols (XFD)
3999    }
4000
4001    fn data_snapshot_id(&self) -> u64 {
4002        self.snapshot_id.load(std::sync::atomic::Ordering::Relaxed)
4003    }
4004
4005    fn backend_caps(&self) -> crate::traits::BackendCaps {
4006        crate::traits::BackendCaps {
4007            streaming: true,
4008            used_region: true,
4009            write: false,
4010            tables: false,
4011            async_stream: false,
4012        }
4013    }
4014
4015    // Flats removed
4016
4017    fn date_system(&self) -> crate::engine::DateSystem {
4018        self.config.date_system
4019    }
4020    /// New: resolve a reference into a RangeView (Phase 2 API)
4021    fn resolve_range_view<'c>(
4022        &'c self,
4023        reference: &ReferenceType,
4024        current_sheet: &str,
4025    ) -> Result<RangeView<'c>, ExcelError> {
4026        match reference {
4027            ReferenceType::External(ext) => {
4028                let name = ext.raw.as_str();
4029                match ext.kind {
4030                    formualizer_parse::parser::ExternalRefKind::Cell { .. } => {
4031                        let Some(source) = self.graph.resolve_source_scalar_entry(name) else {
4032                            return Err(ExcelError::new(ExcelErrorKind::Name)
4033                                .with_message(format!("Undefined name: {name}")));
4034                        };
4035                        let version = source
4036                            .version
4037                            .or_else(|| self.resolver.source_scalar_version(name));
4038                        let v = self.resolve_source_scalar_cached(name, version)?;
4039                        Ok(RangeView::from_owned_rows(
4040                            vec![vec![v]],
4041                            self.config.date_system,
4042                        ))
4043                    }
4044                    formualizer_parse::parser::ExternalRefKind::Range { .. } => {
4045                        let Some(source) = self.graph.resolve_source_table_entry(name) else {
4046                            return Err(ExcelError::new(ExcelErrorKind::Name)
4047                                .with_message(format!("Undefined table: {name}")));
4048                        };
4049                        let version = source
4050                            .version
4051                            .or_else(|| self.resolver.source_table_version(name));
4052                        let table = self.resolve_source_table_cached(name, version)?;
4053                        let spec = Some(formualizer_parse::parser::TableSpecifier::Data);
4054                        self.source_table_to_range_view(table.as_ref(), &spec)
4055                    }
4056                }
4057            }
4058            ReferenceType::Range { .. } => {
4059                let shared = self.resolve_shared_ref(reference, current_sheet)?;
4060                let formualizer_common::SheetRef::Range(range) = shared else {
4061                    return Err(ExcelError::new(ExcelErrorKind::Ref));
4062                };
4063                let sheet_id = match range.sheet {
4064                    formualizer_common::SheetLocator::Id(id) => id,
4065                    _ => return Err(ExcelError::new(ExcelErrorKind::Ref)),
4066                };
4067                let sheet_name = self.graph.sheet_name(sheet_id);
4068
4069                let bounded_range = if range.start_row.is_some()
4070                    && range.start_col.is_some()
4071                    && range.end_row.is_some()
4072                    && range.end_col.is_some()
4073                {
4074                    Some(RangeRef::try_from_shared(range.as_ref())?)
4075                } else {
4076                    None
4077                };
4078
4079                let mut sr = bounded_range
4080                    .as_ref()
4081                    .map(|r| r.start.coord.row() + 1)
4082                    .or_else(|| range.start_row.map(|b| b.index + 1));
4083                let mut sc = bounded_range
4084                    .as_ref()
4085                    .map(|r| r.start.coord.col() + 1)
4086                    .or_else(|| range.start_col.map(|b| b.index + 1));
4087                let mut er = bounded_range
4088                    .as_ref()
4089                    .map(|r| r.end.coord.row() + 1)
4090                    .or_else(|| range.end_row.map(|b| b.index + 1));
4091                let mut ec = bounded_range
4092                    .as_ref()
4093                    .map(|r| r.end.coord.col() + 1)
4094                    .or_else(|| range.end_col.map(|b| b.index + 1));
4095
4096                if sr.is_none() && er.is_none() {
4097                    // Full-column reference: anchor at row 1
4098                    let scv = sc.unwrap_or(1);
4099                    let ecv = ec.unwrap_or(scv);
4100                    sr = Some(1);
4101                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
4102                        er = Some(max_r);
4103                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
4104                        er = Some(self.config.max_open_ended_rows);
4105                    }
4106                }
4107                if sc.is_none() && ec.is_none() {
4108                    // Full-row reference: anchor at column 1
4109                    let srv = sr.unwrap_or(1);
4110                    let erv = er.unwrap_or(srv);
4111                    sc = Some(1);
4112                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
4113                        ec = Some(max_c);
4114                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
4115                        ec = Some(self.config.max_open_ended_cols);
4116                    }
4117                }
4118                if sr.is_some() && er.is_none() {
4119                    let scv = sc.unwrap_or(1);
4120                    let ecv = ec.unwrap_or(scv);
4121                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
4122                        er = Some(max_r);
4123                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
4124                        er = Some(self.config.max_open_ended_rows);
4125                    }
4126                }
4127                if er.is_some() && sr.is_none() {
4128                    // Open start: anchor at row 1
4129                    sr = Some(1);
4130                }
4131                if sc.is_some() && ec.is_none() {
4132                    let srv = sr.unwrap_or(1);
4133                    let erv = er.unwrap_or(srv);
4134                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
4135                        ec = Some(max_c);
4136                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
4137                        ec = Some(self.config.max_open_ended_cols);
4138                    }
4139                }
4140                if ec.is_some() && sc.is_none() {
4141                    // Open start: anchor at column 1
4142                    sc = Some(1);
4143                }
4144
4145                let sr = sr.unwrap_or(1);
4146                let sc = sc.unwrap_or(1);
4147                let er = er.unwrap_or(sr.saturating_sub(1));
4148                let ec = ec.unwrap_or(sc.saturating_sub(1));
4149
4150                let Some(asheet) = self.sheet_store().sheet(sheet_name) else {
4151                    return Ok(RangeView::from_owned_rows(
4152                        Vec::new(),
4153                        self.config.date_system,
4154                    ));
4155                };
4156
4157                let rv = if er < sr || ec < sc {
4158                    asheet.range_view(1, 1, 0, 0)
4159                } else {
4160                    let sr0 = sr.saturating_sub(1) as usize;
4161                    let sc0 = sc.saturating_sub(1) as usize;
4162                    let er0 = er.saturating_sub(1) as usize;
4163                    let ec0 = ec.saturating_sub(1) as usize;
4164                    asheet.range_view(sr0, sc0, er0, ec0)
4165                };
4166
4167                Ok(rv)
4168            }
4169            ReferenceType::Cell { .. } => {
4170                let shared = self.resolve_shared_ref(reference, current_sheet)?;
4171                let formualizer_common::SheetRef::Cell(cell) = shared else {
4172                    return Err(ExcelError::new(ExcelErrorKind::Ref));
4173                };
4174                let addr = CellRef::try_from_shared(cell)?;
4175                let sheet_id = addr.sheet_id;
4176                let sheet_name = self.graph.sheet_name(sheet_id);
4177                let row = addr.coord.row() + 1;
4178                let col = addr.coord.col() + 1;
4179
4180                if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
4181                    let r0 = row.saturating_sub(1) as usize;
4182                    let c0 = col.saturating_sub(1) as usize;
4183                    let rv = asheet.range_view(r0, c0, r0, c0);
4184                    Ok(rv)
4185                } else {
4186                    let v = self
4187                        .get_cell_value(sheet_name, row, col)
4188                        .unwrap_or(LiteralValue::Empty);
4189                    Ok(RangeView::from_owned_rows(
4190                        vec![vec![v]],
4191                        self.config.date_system,
4192                    ))
4193                }
4194            }
4195            ReferenceType::NamedRange(name) => {
4196                if let Some(current_id) = self.graph.sheet_id(current_sheet)
4197                    && let Some(named) = self.graph.resolve_name_entry(name, current_id)
4198                {
4199                    match &named.definition {
4200                        NamedDefinition::Cell(cell_ref) => {
4201                            let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
4202                            let asheet = self
4203                                .sheet_store()
4204                                .sheet(sheet_name)
4205                                .expect("Arrow sheet missing for named cell");
4206
4207                            let r0 = cell_ref.coord.row() as usize;
4208                            let c0 = cell_ref.coord.col() as usize;
4209                            let rv = asheet.range_view(r0, c0, r0, c0);
4210                            return Ok(rv);
4211                        }
4212                        NamedDefinition::Range(range_ref) => {
4213                            let sheet_name = self.graph.sheet_name(range_ref.start.sheet_id);
4214                            let asheet = self
4215                                .sheet_store()
4216                                .sheet(sheet_name)
4217                                .expect("Arrow sheet missing for named range");
4218
4219                            let sr0 = range_ref.start.coord.row() as usize;
4220                            let sc0 = range_ref.start.coord.col() as usize;
4221                            let er0 = range_ref.end.coord.row() as usize;
4222                            let ec0 = range_ref.end.coord.col() as usize;
4223                            let rv = asheet.range_view(sr0, sc0, er0, ec0);
4224                            return Ok(rv);
4225                        }
4226                        NamedDefinition::Formula { .. } => {
4227                            if let Some(value) = self.graph.get_value(named.vertex) {
4228                                return Ok(RangeView::from_owned_rows(
4229                                    vec![vec![value]],
4230                                    self.config.date_system,
4231                                ));
4232                            }
4233                        }
4234                    }
4235                }
4236
4237                if let Some(source) = self.graph.resolve_source_scalar_entry(name) {
4238                    let version = source
4239                        .version
4240                        .or_else(|| self.resolver.source_scalar_version(name));
4241                    let v = self.resolve_source_scalar_cached(name, version)?;
4242                    return Ok(RangeView::from_owned_rows(
4243                        vec![vec![v]],
4244                        self.config.date_system,
4245                    ));
4246                }
4247
4248                let data = self.resolver.resolve_named_range_reference(name)?;
4249                Ok(RangeView::from_owned_rows(data, self.config.date_system))
4250            }
4251            ReferenceType::Table(tref) => {
4252                if let Some(table) = self.graph.resolve_table_entry(&tref.name) {
4253                    let sheet_name = self.graph.sheet_name(table.range.start.sheet_id);
4254                    let asheet = self
4255                        .sheet_store()
4256                        .sheet(sheet_name)
4257                        .expect("Arrow sheet missing for table reference");
4258
4259                    let sr0 = table.range.start.coord.row() as usize;
4260                    let sc0 = table.range.start.coord.col() as usize;
4261                    let er0 = table.range.end.coord.row() as usize;
4262                    let ec0 = table.range.end.coord.col() as usize;
4263
4264                    let has_totals = table.totals_row;
4265                    let data_sr = sr0.saturating_add(1);
4266                    let data_er = if has_totals {
4267                        er0.saturating_sub(1)
4268                    } else {
4269                        er0
4270                    };
4271
4272                    let select = |sr: usize, sc: usize, er: usize, ec: usize| {
4273                        if sr > er || sc > ec {
4274                            asheet.range_view(1, 1, 0, 0)
4275                        } else {
4276                            asheet.range_view(sr, sc, er, ec)
4277                        }
4278                    };
4279
4280                    let av = match &tref.specifier {
4281                        None => {
4282                            return Err(ExcelError::new(ExcelErrorKind::NImpl).with_message(
4283                                "Table reference without specifier is unsupported".to_string(),
4284                            ));
4285                        }
4286                        Some(formualizer_parse::parser::TableSpecifier::Column(col)) => {
4287                            let Some(idx) = table.col_index(col) else {
4288                                return Err(ExcelError::new(ExcelErrorKind::Ref).with_message(
4289                                    "Column refers to unknown table column".to_string(),
4290                                ));
4291                            };
4292                            let c0 = sc0 + idx;
4293                            select(data_sr, c0, data_er, c0)
4294                        }
4295                        Some(formualizer_parse::parser::TableSpecifier::ColumnRange(
4296                            start,
4297                            end,
4298                        )) => {
4299                            let Some(si) = table.col_index(start) else {
4300                                return Err(ExcelError::new(ExcelErrorKind::Ref).with_message(
4301                                    "Column range refers to unknown column(s)".to_string(),
4302                                ));
4303                            };
4304                            let Some(ei) = table.col_index(end) else {
4305                                return Err(ExcelError::new(ExcelErrorKind::Ref).with_message(
4306                                    "Column range refers to unknown column(s)".to_string(),
4307                                ));
4308                            };
4309                            let (mut a, mut b) = (si, ei);
4310                            if a > b {
4311                                std::mem::swap(&mut a, &mut b);
4312                            }
4313                            let c_start = sc0 + a;
4314                            let c_end = sc0 + b;
4315                            select(data_sr, c_start, data_er, c_end)
4316                        }
4317                        Some(formualizer_parse::parser::TableSpecifier::All)
4318                        | Some(formualizer_parse::parser::TableSpecifier::SpecialItem(
4319                            formualizer_parse::parser::SpecialItem::All,
4320                        )) => select(sr0, sc0, er0, ec0),
4321                        Some(formualizer_parse::parser::TableSpecifier::Data)
4322                        | Some(formualizer_parse::parser::TableSpecifier::SpecialItem(
4323                            formualizer_parse::parser::SpecialItem::Data,
4324                        )) => select(data_sr, sc0, data_er, ec0),
4325                        Some(formualizer_parse::parser::TableSpecifier::Headers)
4326                        | Some(formualizer_parse::parser::TableSpecifier::SpecialItem(
4327                            formualizer_parse::parser::SpecialItem::Headers,
4328                        )) => select(sr0, sc0, sr0, ec0),
4329                        Some(formualizer_parse::parser::TableSpecifier::Totals)
4330                        | Some(formualizer_parse::parser::TableSpecifier::SpecialItem(
4331                            formualizer_parse::parser::SpecialItem::Totals,
4332                        )) => {
4333                            if !has_totals {
4334                                asheet.range_view(1, 1, 0, 0)
4335                            } else {
4336                                select(er0, sc0, er0, ec0)
4337                            }
4338                        }
4339                        Some(formualizer_parse::parser::TableSpecifier::SpecialItem(
4340                            formualizer_parse::parser::SpecialItem::ThisRow,
4341                        )) => {
4342                            return Err(ExcelError::new(ExcelErrorKind::NImpl).with_message(
4343                                "@ (This Row) requires table-aware context; not yet supported"
4344                                    .to_string(),
4345                            ));
4346                        }
4347                        Some(formualizer_parse::parser::TableSpecifier::Row(_))
4348                        | Some(formualizer_parse::parser::TableSpecifier::Combination(_)) => {
4349                            return Err(ExcelError::new(ExcelErrorKind::NImpl).with_message(
4350                                "Complex structured references not yet supported".to_string(),
4351                            ));
4352                        }
4353                    };
4354
4355                    let rv = asheet.range_view(sr0, sc0, er0, ec0);
4356                    return Ok(rv);
4357                }
4358
4359                if let Some(source) = self.graph.resolve_source_table_entry(&tref.name) {
4360                    let version = source
4361                        .version
4362                        .or_else(|| self.resolver.source_table_version(&tref.name));
4363                    let table = self.resolve_source_table_cached(&tref.name, version)?;
4364                    return self.source_table_to_range_view(table.as_ref(), &tref.specifier);
4365                }
4366
4367                // Fallback: materialize via Resolver::resolve_range_like tranche 1
4368                let boxed = self.resolve_range_like(&ReferenceType::Table(tref.clone()))?;
4369                let owned = boxed.materialise().into_owned();
4370                Ok(RangeView::from_owned_rows(owned, self.config.date_system))
4371            }
4372        }
4373    }
4374
4375    fn build_criteria_mask(
4376        &self,
4377        view: &RangeView<'_>,
4378        col_in_view: usize,
4379        pred: &crate::args::CriteriaPredicate,
4380    ) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
4381        if view.dims().1 == 0 {
4382            return None;
4383        }
4384        // If the view is logically open-ended but the backing sheet has no physical rows,
4385        // treat the mask as empty (0-len) rather than attempting to build a huge mask.
4386        let sheet_rows = view.sheet().nrows as usize;
4387        if sheet_rows == 0 || view.start_row() >= sheet_rows {
4388            return Some(std::sync::Arc::new(arrow_array::BooleanArray::new_null(0)));
4389        }
4390        compute_criteria_mask(view, col_in_view, pred)
4391    }
4392}
4393
4394impl<R> Engine<R>
4395where
4396    R: EvaluationContext,
4397{
4398    /// Helper: commit spill via shim and mirror resulting cells into Arrow overlay when enabled.
4399    fn commit_spill_and_mirror(
4400        &mut self,
4401        anchor_vertex: VertexId,
4402        targets: &[CellRef],
4403        rows: Vec<Vec<LiteralValue>>,
4404        delta: Option<&mut DeltaCollector>,
4405    ) -> Result<(), ExcelError> {
4406        let prev_spill_cells = self
4407            .graph
4408            .spill_cells_for_anchor(anchor_vertex)
4409            .map(|cells| cells.to_vec())
4410            .unwrap_or_default();
4411
4412        if let Some(delta) = delta
4413            && delta.mode != DeltaMode::Off
4414        {
4415            let target_set: FxHashSet<CellRef> = targets.iter().copied().collect();
4416            let empty = LiteralValue::Empty;
4417
4418            // Clears (prev - targets)
4419            for cell in prev_spill_cells.iter() {
4420                if target_set.contains(cell) {
4421                    continue;
4422                }
4423                let sheet_name = self.graph.sheet_name(cell.sheet_id);
4424                let old = self
4425                    .get_cell_value(sheet_name, cell.coord.row() + 1, cell.coord.col() + 1)
4426                    .unwrap_or(LiteralValue::Empty);
4427                if old != empty {
4428                    delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
4429                }
4430            }
4431
4432            // Writes (targets)
4433            if !targets.is_empty() && !rows.is_empty() && !rows[0].is_empty() {
4434                let width = rows[0].len();
4435                for (idx, cell) in targets.iter().enumerate() {
4436                    let r_off = idx / width;
4437                    let c_off = idx % width;
4438                    let new = rows
4439                        .get(r_off)
4440                        .and_then(|r| r.get(c_off))
4441                        .cloned()
4442                        .unwrap_or(LiteralValue::Empty);
4443                    let sheet_name = self.graph.sheet_name(cell.sheet_id);
4444                    let old = self
4445                        .get_cell_value(sheet_name, cell.coord.row() + 1, cell.coord.col() + 1)
4446                        .unwrap_or(LiteralValue::Empty);
4447                    if old != new {
4448                        delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
4449                    }
4450                }
4451            } else {
4452                // Degenerate shapes: if we have targets but no rows, treat as writing Empty.
4453                for cell in targets.iter() {
4454                    let sheet_name = self.graph.sheet_name(cell.sheet_id);
4455                    let old = self
4456                        .get_cell_value(sheet_name, cell.coord.row() + 1, cell.coord.col() + 1)
4457                        .unwrap_or(LiteralValue::Empty);
4458                    if !matches!(old, LiteralValue::Empty) {
4459                        delta.record_cell(cell.sheet_id, cell.coord.row(), cell.coord.col());
4460                    }
4461                }
4462            }
4463        }
4464
4465        // Commit via shim (releases locks)
4466        self.spill_mgr
4467            .commit_array(&mut self.graph, anchor_vertex, targets, rows.clone())?;
4468
4469        if self.config.arrow_storage_enabled
4470            && self.config.delta_overlay_enabled
4471            && self.config.write_formula_overlay_enabled
4472        {
4473            if !prev_spill_cells.is_empty() {
4474                let target_set: FxHashSet<CellRef> = targets.iter().copied().collect();
4475                let empty = LiteralValue::Empty;
4476                for cell in prev_spill_cells.iter() {
4477                    if !target_set.contains(cell) {
4478                        let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
4479                        self.mirror_value_to_overlay(
4480                            &sheet_name,
4481                            cell.coord.row() + 1,
4482                            cell.coord.col() + 1,
4483                            &empty,
4484                        );
4485                    }
4486                }
4487            }
4488
4489            for (idx, cell) in targets.iter().enumerate() {
4490                if rows.is_empty() || rows[0].is_empty() {
4491                    break;
4492                }
4493                let width = rows[0].len();
4494                let r_off = idx / width;
4495                let c_off = idx % width;
4496                let v = rows[r_off][c_off].clone();
4497                let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
4498                self.mirror_value_to_overlay(
4499                    &sheet_name,
4500                    cell.coord.row() + 1,
4501                    cell.coord.col() + 1,
4502                    &v,
4503                );
4504            }
4505        }
4506        Ok(())
4507    }
4508}