formualizer_eval/engine/
eval.rs

1use crate::SheetId;
2use crate::arrow_store::SheetStore;
3use crate::engine::pass_planner::PassPlanner;
4use crate::engine::range_view::RangeView;
5use crate::engine::spill::{RegionLockManager, SpillMeta, SpillShape};
6use crate::engine::warmup::{PassContext, WarmupExecutor};
7use crate::engine::{DependencyGraph, EvalConfig, Scheduler, VertexId, VertexKind};
8use crate::interpreter::Interpreter;
9use crate::reference::{CellRef, Coord};
10use crate::traits::FunctionProvider;
11use crate::traits::{EvaluationContext, Resolver};
12use chrono::Timelike;
13use formualizer_parse::parser::ReferenceType;
14use formualizer_parse::{ASTNode, ExcelError, ExcelErrorKind, LiteralValue};
15use rayon::ThreadPoolBuilder;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18
19pub struct Engine<R> {
20    pub graph: DependencyGraph,
21    resolver: R,
22    pub config: EvalConfig,
23    thread_pool: Option<Arc<rayon::ThreadPool>>,
24    pub recalc_epoch: u64,
25    snapshot_id: std::sync::atomic::AtomicU64,
26    spill_mgr: ShimSpillManager,
27    /// Optional pass-scoped warmup context (flats/masks) available during evaluation
28    warmup_pass_ctx: Option<PassContext>,
29    /// Arrow-backed storage for sheet values (Phase A)
30    arrow_sheets: SheetStore,
31    /// True if any edit after bulk load; disables Arrow reads for parity
32    has_edited: bool,
33    /// Overlay compaction counter (Phase C instrumentation)
34    overlay_compactions: u64,
35    // Pass-scoped cache for Arrow used-row bounds per column
36    row_bounds_cache: std::sync::RwLock<Option<RowBoundsCache>>,
37    // Pass-scoped criteria mask cache (Arrow-only)
38    mask_cache: std::sync::RwLock<Option<MaskCache>>,
39    /// Staged formulas by sheet when `defer_graph_building` is enabled.
40    staged_formulas: std::collections::HashMap<String, Vec<(u32, u32, String)>>,
41}
42
43#[derive(Debug)]
44pub struct EvalResult {
45    pub computed_vertices: usize,
46    pub cycle_errors: usize,
47    pub elapsed: std::time::Duration,
48}
49
50#[derive(Debug, Clone)]
51pub struct LayerInfo {
52    pub vertex_count: usize,
53    pub parallel_eligible: bool,
54    pub sample_cells: Vec<String>, // Sample of up to 5 cell addresses
55}
56
57#[derive(Debug, Clone)]
58pub struct EvalPlan {
59    pub total_vertices_to_evaluate: usize,
60    pub layers: Vec<LayerInfo>,
61    pub cycles_detected: usize,
62    pub dirty_count: usize,
63    pub volatile_count: usize,
64    pub parallel_enabled: bool,
65    pub estimated_parallel_layers: usize,
66    pub target_cells: Vec<String>,
67}
68
69impl<R> Engine<R>
70where
71    R: EvaluationContext,
72{
73    pub fn new(resolver: R, config: EvalConfig) -> Self {
74        crate::builtins::load_builtins();
75
76        // Initialize thread pool based on config
77        let thread_pool = if config.enable_parallel {
78            let mut builder = ThreadPoolBuilder::new();
79            if let Some(max_threads) = config.max_threads {
80                builder = builder.num_threads(max_threads);
81            }
82
83            match builder.build() {
84                Ok(pool) => Some(Arc::new(pool)),
85                Err(_) => {
86                    // Fall back to sequential evaluation if thread pool creation fails
87                    None
88                }
89            }
90        } else {
91            None
92        };
93
94        Self {
95            graph: DependencyGraph::new_with_config(config.clone()),
96            resolver,
97            config,
98            thread_pool,
99            recalc_epoch: 0,
100            snapshot_id: std::sync::atomic::AtomicU64::new(1),
101            spill_mgr: ShimSpillManager::default(),
102            warmup_pass_ctx: None,
103            arrow_sheets: SheetStore::default(),
104            has_edited: false,
105            overlay_compactions: 0,
106            row_bounds_cache: std::sync::RwLock::new(None),
107            mask_cache: std::sync::RwLock::new(None),
108            staged_formulas: std::collections::HashMap::new(),
109        }
110    }
111
112    /// Create an Engine with a custom thread pool (for shared thread pool scenarios)
113    pub fn with_thread_pool(
114        resolver: R,
115        config: EvalConfig,
116        thread_pool: Arc<rayon::ThreadPool>,
117    ) -> Self {
118        crate::builtins::load_builtins();
119        Self {
120            graph: DependencyGraph::new_with_config(config.clone()),
121            resolver,
122            config,
123            thread_pool: Some(thread_pool),
124            recalc_epoch: 0,
125            snapshot_id: std::sync::atomic::AtomicU64::new(1),
126            spill_mgr: ShimSpillManager::default(),
127            warmup_pass_ctx: None,
128            arrow_sheets: SheetStore::default(),
129            has_edited: false,
130            overlay_compactions: 0,
131            row_bounds_cache: std::sync::RwLock::new(None),
132            mask_cache: std::sync::RwLock::new(None),
133            staged_formulas: std::collections::HashMap::new(),
134        }
135    }
136
137    pub fn default_sheet_id(&self) -> SheetId {
138        self.graph.default_sheet_id()
139    }
140
141    pub fn default_sheet_name(&self) -> &str {
142        self.graph.default_sheet_name()
143    }
144
145    /// Update the workbook seed for deterministic RNGs in functions.
146    pub fn set_workbook_seed(&mut self, seed: u64) {
147        self.config.workbook_seed = seed;
148    }
149
150    /// Set the volatile level policy (Always/OnRecalc/OnOpen)
151    pub fn set_volatile_level(&mut self, level: crate::traits::VolatileLevel) {
152        self.config.volatile_level = level;
153    }
154
155    pub fn sheet_id(&self, name: &str) -> Option<SheetId> {
156        self.graph.sheet_id(name)
157    }
158
159    pub fn set_default_sheet_by_name(&mut self, name: &str) {
160        self.graph.set_default_sheet_by_name(name);
161    }
162
163    pub fn set_default_sheet_by_id(&mut self, id: SheetId) {
164        self.graph.set_default_sheet_by_id(id);
165    }
166
167    pub fn set_sheet_index_mode(&mut self, mode: crate::engine::SheetIndexMode) {
168        self.graph.set_sheet_index_mode(mode);
169    }
170
171    /// Mark data edited: bump snapshot and set edited flag
172    pub fn mark_data_edited(&mut self) {
173        self.snapshot_id
174            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
175        self.has_edited = true;
176    }
177
178    /// Access Arrow sheet store (read-only)
179    pub fn sheet_store(&self) -> &SheetStore {
180        &self.arrow_sheets
181    }
182
183    /// Access Arrow sheet store (mutable)
184    pub fn sheet_store_mut(&mut self) -> &mut SheetStore {
185        &mut self.arrow_sheets
186    }
187
188    /// Stage a formula text instead of inserting into the graph (used when deferring is enabled).
189    pub fn stage_formula_text(&mut self, sheet: &str, row: u32, col: u32, text: String) {
190        self.staged_formulas
191            .entry(sheet.to_string())
192            .or_default()
193            .push((row, col, text));
194    }
195
196    /// Get a staged formula text for a given cell if present (cloned).
197    pub fn get_staged_formula_text(&self, sheet: &str, row: u32, col: u32) -> Option<String> {
198        self.staged_formulas.get(sheet).and_then(|v| {
199            v.iter()
200                .find(|(r, c, _)| *r == row && *c == col)
201                .map(|(_, _, s)| s.clone())
202        })
203    }
204
205    /// Build graph for all staged formulas.
206    pub fn build_graph_all(&mut self) -> Result<(), formualizer_parse::ExcelError> {
207        if self.staged_formulas.is_empty() {
208            return Ok(());
209        }
210        // Take staged formulas before borrowing graph via builder
211        let staged = std::mem::take(&mut self.staged_formulas);
212        let mut builder = self.begin_bulk_ingest();
213        for (sheet, entries) in staged {
214            let sid = builder.add_sheet(&sheet);
215            // Parse with small cache for shared formulas
216            let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
217                rustc_hash::FxHashMap::default();
218            cache.reserve(4096);
219            for (row, col, txt) in entries {
220                let key = if txt.starts_with('=') {
221                    txt
222                } else {
223                    format!("={txt}")
224                };
225                let ast = if let Some(p) = cache.get(&key) {
226                    p.clone()
227                } else {
228                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
229                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
230                            .with_message(e.to_string())
231                    })?;
232                    cache.insert(key.clone(), parsed.clone());
233                    parsed
234                };
235                builder.add_formulas(sid, std::iter::once((row, col, ast)));
236            }
237        }
238        let _ = builder.finish();
239        Ok(())
240    }
241
242    /// Build graph for specific sheets (consuming only those staged entries).
243    pub fn build_graph_for_sheets<'a, I: IntoIterator<Item = &'a str>>(
244        &mut self,
245        sheets: I,
246    ) -> Result<(), formualizer_parse::ExcelError> {
247        let mut any = false;
248        // Collect entries first to avoid aliasing self while builder borrows graph
249        struct SheetFormulas<'s> {
250            sheet: &'s str,
251            entries: Vec<(u32, u32, String)>,
252        }
253        let mut collected: Vec<SheetFormulas<'_>> = Vec::new();
254        for s in sheets {
255            if let Some(entries) = self.staged_formulas.remove(s) {
256                collected.push(SheetFormulas { sheet: s, entries });
257            }
258        }
259        let mut builder = self.begin_bulk_ingest();
260        // small parse cache per call
261        let mut cache: rustc_hash::FxHashMap<String, formualizer_parse::ASTNode> =
262            rustc_hash::FxHashMap::default();
263        cache.reserve(4096);
264        for SheetFormulas { sheet: s, entries } in collected.into_iter() {
265            any = true;
266            let sid = builder.add_sheet(s);
267            for (row, col, txt) in entries {
268                let key = if txt.starts_with('=') {
269                    txt
270                } else {
271                    format!("={txt}")
272                };
273                let ast = if let Some(p) = cache.get(&key) {
274                    p.clone()
275                } else {
276                    let parsed = formualizer_parse::parser::parse(&key).map_err(|e| {
277                        formualizer_parse::ExcelError::new(formualizer_parse::ExcelErrorKind::Value)
278                            .with_message(e.to_string())
279                    })?;
280                    cache.insert(key.clone(), parsed.clone());
281                    parsed
282                };
283                builder.add_formulas(sid, std::iter::once((row, col, ast)));
284            }
285        }
286        if any {
287            let _ = builder.finish();
288        }
289        Ok(())
290    }
291
292    /// Begin bulk Arrow ingest for base values (Phase A)
293    pub fn begin_bulk_ingest_arrow(
294        &mut self,
295    ) -> crate::engine::arrow_ingest::ArrowBulkIngestBuilder<'_, R> {
296        crate::engine::arrow_ingest::ArrowBulkIngestBuilder::new(self)
297    }
298
299    /// Begin bulk updates to Arrow store (Phase C)
300    pub fn begin_bulk_update_arrow(
301        &mut self,
302    ) -> crate::engine::arrow_ingest::ArrowBulkUpdateBuilder<'_, R> {
303        crate::engine::arrow_ingest::ArrowBulkUpdateBuilder::new(self)
304    }
305
306    /// Insert rows (1-based) and mirror into Arrow store when enabled
307    pub fn insert_rows(
308        &mut self,
309        sheet: &str,
310        before: u32,
311        count: u32,
312    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
313    {
314        use crate::engine::graph::editor::vertex_editor::VertexEditor;
315        let sheet_id = self.graph.sheet_id(sheet).ok_or(
316            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
317                name: sheet.to_string(),
318                reason: "Unknown sheet".to_string(),
319            },
320        )?;
321        let mut editor = VertexEditor::new(&mut self.graph);
322        let summary = editor.insert_rows(sheet_id, before, count)?;
323        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
324            let before0 = before.saturating_sub(1) as usize;
325            asheet.insert_rows(before0, count as usize);
326        }
327        self.snapshot_id
328            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
329        self.has_edited = true;
330        Ok(summary)
331    }
332
333    /// Delete rows (1-based) and mirror into Arrow store when enabled
334    pub fn delete_rows(
335        &mut self,
336        sheet: &str,
337        start: u32,
338        count: u32,
339    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
340    {
341        use crate::engine::graph::editor::vertex_editor::VertexEditor;
342        let sheet_id = self.graph.sheet_id(sheet).ok_or(
343            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
344                name: sheet.to_string(),
345                reason: "Unknown sheet".to_string(),
346            },
347        )?;
348        let mut editor = VertexEditor::new(&mut self.graph);
349        let summary = editor.delete_rows(sheet_id, start, count)?;
350        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
351            let start0 = start.saturating_sub(1) as usize;
352            asheet.delete_rows(start0, count as usize);
353        }
354        self.snapshot_id
355            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
356        self.has_edited = true;
357        Ok(summary)
358    }
359
360    /// Insert columns (1-based) and mirror into Arrow store when enabled
361    pub fn insert_columns(
362        &mut self,
363        sheet: &str,
364        before: u32,
365        count: u32,
366    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
367    {
368        use crate::engine::graph::editor::vertex_editor::VertexEditor;
369        let sheet_id = self.graph.sheet_id(sheet).ok_or(
370            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
371                name: sheet.to_string(),
372                reason: "Unknown sheet".to_string(),
373            },
374        )?;
375        let mut editor = VertexEditor::new(&mut self.graph);
376        let summary = editor.insert_columns(sheet_id, before, count)?;
377        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
378            let before0 = before.saturating_sub(1) as usize;
379            asheet.insert_columns(before0, count as usize);
380        }
381        self.snapshot_id
382            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
383        self.has_edited = true;
384        Ok(summary)
385    }
386
387    /// Delete columns (1-based) and mirror into Arrow store when enabled
388    pub fn delete_columns(
389        &mut self,
390        sheet: &str,
391        start: u32,
392        count: u32,
393    ) -> Result<crate::engine::graph::editor::vertex_editor::ShiftSummary, crate::engine::EditorError>
394    {
395        use crate::engine::graph::editor::vertex_editor::VertexEditor;
396        let sheet_id = self.graph.sheet_id(sheet).ok_or(
397            crate::engine::graph::editor::vertex_editor::EditorError::InvalidName {
398                name: sheet.to_string(),
399                reason: "Unknown sheet".to_string(),
400            },
401        )?;
402        let mut editor = VertexEditor::new(&mut self.graph);
403        let summary = editor.delete_columns(sheet_id, start, count)?;
404        if let Some(asheet) = self.arrow_sheets.sheet_mut(sheet) {
405            let start0 = start.saturating_sub(1) as usize;
406            asheet.delete_columns(start0, count as usize);
407        }
408        self.snapshot_id
409            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
410        self.has_edited = true;
411        Ok(summary)
412    }
413    /// Arrow-backed used row bounds across a column span (1-based inclusive cols).
414    fn arrow_used_row_bounds(
415        &self,
416        sheet: &str,
417        start_col: u32,
418        end_col: u32,
419    ) -> Option<(u32, u32)> {
420        let a = self.sheet_store().sheet(sheet)?;
421        if a.columns.is_empty() {
422            return None;
423        }
424        let sc0 = start_col.saturating_sub(1) as usize;
425        let ec0 = end_col.saturating_sub(1) as usize;
426        let col_hi = a.columns.len().saturating_sub(1);
427        if sc0 > col_hi {
428            return None;
429        }
430        let ec0 = ec0.min(col_hi);
431        // Pass-scoped cache with snapshot guard
432        let snap = self.data_snapshot_id();
433        let mut min_r0: Option<usize> = None;
434        for ci in sc0..=ec0 {
435            let sheet_id = self.graph.sheet_id(sheet)?;
436            if let Some((Some(mv), _)) = self.row_bounds_cache.read().ok().and_then(|g| {
437                g.as_ref()
438                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
439            }) {
440                let mv = mv as usize;
441                min_r0 = Some(min_r0.map(|m| m.min(mv)).unwrap_or(mv));
442                continue;
443            }
444            // Compute and store
445            let (min_c, max_c) = Self::scan_column_used_bounds(a, ci);
446            if let Ok(mut g) = self.row_bounds_cache.write() {
447                g.get_or_insert_with(|| RowBoundsCache::new(snap))
448                    .put_row_bounds(sheet_id, ci, snap, (min_c, max_c));
449            }
450            if let Some(m) = min_c {
451                min_r0 = Some(min_r0.map(|mm| mm.min(m as usize)).unwrap_or(m as usize));
452            }
453        }
454        min_r0?;
455        let mut max_r0: Option<usize> = None;
456        for ci in sc0..=ec0 {
457            let sheet_id = self.graph.sheet_id(sheet)?;
458            if let Some((_, Some(mv))) = self.row_bounds_cache.read().ok().and_then(|g| {
459                g.as_ref()
460                    .and_then(|c| c.get_row_bounds(sheet_id, ci, snap))
461            }) {
462                let mv = mv as usize;
463                max_r0 = Some(max_r0.map(|m| m.max(mv)).unwrap_or(mv));
464                continue;
465            }
466            let (_min_c, max_c) = Self::scan_column_used_bounds(a, ci);
467            if let Ok(mut g) = self.row_bounds_cache.write() {
468                g.get_or_insert_with(|| RowBoundsCache::new(snap))
469                    .put_row_bounds(sheet_id, ci, snap, (_min_c, max_c));
470            }
471            if let Some(m) = max_c {
472                max_r0 = Some(max_r0.map(|mm| mm.max(m as usize)).unwrap_or(m as usize));
473            }
474        }
475        match (min_r0, max_r0) {
476            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
477            _ => None,
478        }
479    }
480
481    fn scan_column_used_bounds(
482        a: &crate::arrow_store::ArrowSheet,
483        ci: usize,
484    ) -> (Option<u32>, Option<u32>) {
485        let col = &a.columns[ci];
486        // Min
487        let mut min_r0: Option<u32> = None;
488        for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
489            let tags = chunk.type_tag.values();
490            for (off, &t) in tags.iter().enumerate() {
491                let overlay_non_empty = chunk
492                    .overlay
493                    .get(off)
494                    .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
495                    .unwrap_or(false);
496                if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
497                    let row0 = a.chunk_starts[chunk_idx] + off;
498                    min_r0 = Some(row0 as u32);
499                    break;
500                }
501            }
502            if min_r0.is_some() {
503                break;
504            }
505        }
506        // Max
507        let mut max_r0: Option<u32> = None;
508        for (chunk_rel, chunk) in col.chunks.iter().enumerate().rev() {
509            let chunk_idx = chunk_rel;
510            let tags = chunk.type_tag.values();
511            for (rev_idx, &t) in tags.iter().enumerate().rev() {
512                let overlay_non_empty = chunk
513                    .overlay
514                    .get(rev_idx)
515                    .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
516                    .unwrap_or(false);
517                if overlay_non_empty || t != crate::arrow_store::TypeTag::Empty as u8 {
518                    let row0 = a.chunk_starts[chunk_idx] + rev_idx;
519                    max_r0 = Some(row0 as u32);
520                    break;
521                }
522            }
523            if max_r0.is_some() {
524                break;
525            }
526        }
527        (min_r0, max_r0)
528    }
529
530    /// Arrow-backed used column bounds across a row span (1-based inclusive rows).
531    fn arrow_used_col_bounds(
532        &self,
533        sheet: &str,
534        start_row: u32,
535        end_row: u32,
536    ) -> Option<(u32, u32)> {
537        let a = self.sheet_store().sheet(sheet)?;
538        if a.columns.is_empty() {
539            return None;
540        }
541        let sr0 = start_row.saturating_sub(1) as usize;
542        let er0 = end_row.saturating_sub(1) as usize;
543        if sr0 > er0 {
544            return None;
545        }
546        // Map start/end rows into chunk ranges
547        // We will scan each column for any non-empty within [sr0..=er0]
548        let mut min_c0: Option<usize> = None;
549        let mut max_c0: Option<usize> = None;
550        // Precompute chunk bounds for row range
551        for (ci, col) in a.columns.iter().enumerate() {
552            // Skip columns with no chunks (shouldn't happen)
553            if col.chunks.is_empty() {
554                continue;
555            }
556            let mut any_in_range = false;
557            for (chunk_idx, chunk) in col.chunks.iter().enumerate() {
558                let chunk_start = a.chunk_starts[chunk_idx];
559                let chunk_len = chunk.type_tag.len();
560                let chunk_end = chunk_start + chunk_len.saturating_sub(1);
561                // check intersection
562                if sr0 > chunk_end || er0 < chunk_start {
563                    continue;
564                }
565                let start_off = sr0.max(chunk_start) - chunk_start;
566                let end_off = er0.min(chunk_end) - chunk_start;
567                let tags = chunk.type_tag.values();
568                for off in start_off..=end_off {
569                    let overlay_non_empty = chunk
570                        .overlay
571                        .get(off)
572                        .map(|ov| !matches!(ov, crate::arrow_store::OverlayValue::Empty))
573                        .unwrap_or(false);
574                    if overlay_non_empty || tags[off] != crate::arrow_store::TypeTag::Empty as u8 {
575                        any_in_range = true;
576                        break;
577                    }
578                }
579                if any_in_range {
580                    break;
581                }
582            }
583            if any_in_range {
584                min_c0 = Some(min_c0.map(|m| m.min(ci)).unwrap_or(ci));
585                max_c0 = Some(max_c0.map(|m| m.max(ci)).unwrap_or(ci));
586            }
587        }
588        match (min_c0, max_c0) {
589            (Some(a0), Some(b0)) => Some(((a0 as u32) + 1, (b0 as u32) + 1)),
590            _ => None,
591        }
592    }
593
594    /// Mirror a single cell value into the Arrow overlay if enabled.
595    /// Handles capacity growth, per-chunk overlay set, and heuristic compaction.
596    fn mirror_value_to_overlay(&mut self, sheet: &str, row: u32, col: u32, value: &LiteralValue) {
597        if !(self.config.arrow_storage_enabled && self.config.delta_overlay_enabled) {
598            return;
599        }
600        let asheet = match self.arrow_sheets.sheet_mut(sheet) {
601            Some(s) => s,
602            None => return,
603        };
604        let row0 = row.saturating_sub(1) as usize;
605        let col0 = col.saturating_sub(1) as usize;
606        if col0 >= asheet.columns.len() {
607            return;
608        }
609        if row0 >= asheet.nrows as usize {
610            asheet.ensure_row_capacity(row0 + 1);
611        }
612        if let Some((ch_idx, in_off)) = asheet.chunk_of_row(row0) {
613            use crate::arrow_store::OverlayValue;
614            let ov = match value {
615                LiteralValue::Empty => OverlayValue::Empty,
616                LiteralValue::Int(i) => OverlayValue::Number(*i as f64),
617                LiteralValue::Number(n) => OverlayValue::Number(*n),
618                LiteralValue::Boolean(b) => OverlayValue::Boolean(*b),
619                LiteralValue::Text(s) => OverlayValue::Text(std::sync::Arc::from(s.clone())),
620                LiteralValue::Error(e) => {
621                    OverlayValue::Error(crate::arrow_store::map_error_code(e.kind))
622                }
623                LiteralValue::Date(d) => {
624                    let dt = d.and_hms_opt(0, 0, 0).unwrap();
625                    let serial = crate::builtins::datetime::datetime_to_serial_for(
626                        self.config.date_system,
627                        &dt,
628                    );
629                    OverlayValue::Number(serial)
630                }
631                LiteralValue::DateTime(dt) => {
632                    let serial = crate::builtins::datetime::datetime_to_serial_for(
633                        self.config.date_system,
634                        dt,
635                    );
636                    OverlayValue::Number(serial)
637                }
638                LiteralValue::Time(t) => {
639                    let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
640                    OverlayValue::Number(serial)
641                }
642                LiteralValue::Duration(d) => {
643                    let serial = d.num_seconds() as f64 / 86_400.0;
644                    OverlayValue::Number(serial)
645                }
646                LiteralValue::Pending => OverlayValue::Pending,
647                LiteralValue::Array(_) => OverlayValue::Error(crate::arrow_store::map_error_code(
648                    formualizer_common::ExcelErrorKind::Value,
649                )),
650            };
651            let colref = &mut asheet.columns[col0];
652            let ch = &mut colref.chunks[ch_idx];
653            ch.overlay.set(in_off, ov);
654            // Heuristic compaction: > len/50 or > 1024
655            let abs_threshold = 1024usize;
656            let frac_den = 50usize;
657            if asheet.maybe_compact_chunk(col0, ch_idx, abs_threshold, frac_den) {
658                self.overlay_compactions = self.overlay_compactions.saturating_add(1);
659            }
660        }
661    }
662
663    /// Set a cell value
664    pub fn set_cell_value(
665        &mut self,
666        sheet: &str,
667        row: u32,
668        col: u32,
669        value: LiteralValue,
670    ) -> Result<(), ExcelError> {
671        self.graph.set_cell_value(sheet, row, col, value.clone())?;
672        // Mirror into Arrow overlay when enabled
673        self.mirror_value_to_overlay(sheet, row, col, &value);
674        // Advance snapshot to reflect external mutation
675        self.snapshot_id
676            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
677        self.has_edited = true;
678        Ok(())
679    }
680
681    /// Set a cell formula
682    pub fn set_cell_formula(
683        &mut self,
684        sheet: &str,
685        row: u32,
686        col: u32,
687        ast: ASTNode,
688    ) -> Result<(), ExcelError> {
689        let volatile = self.is_ast_volatile_with_provider(&ast);
690        self.graph
691            .set_cell_formula_with_volatility(sheet, row, col, ast, volatile)?;
692        // Advance snapshot to reflect external mutation
693        self.snapshot_id
694            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
695        self.has_edited = true;
696        Ok(())
697    }
698
699    /// Bulk set many formulas on a sheet. Skips per-cell snapshot bumping and minimizes edge rebuilds.
700    pub fn bulk_set_formulas<I>(&mut self, sheet: &str, items: I) -> Result<usize, ExcelError>
701    where
702        I: IntoIterator<Item = (u32, u32, ASTNode)>,
703    {
704        let n = self.graph.bulk_set_formulas(sheet, items)?;
705        // Single snapshot bump after batch
706        self.snapshot_id
707            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
708        if n > 0 {
709            self.has_edited = true;
710        }
711        Ok(n)
712    }
713
714    /// Get a cell value
715    pub fn get_cell_value(&self, sheet: &str, row: u32, col: u32) -> Option<LiteralValue> {
716        // Prefer Arrow for non-formula cells. For formula cells, use graph value.
717        if let Some(sheet_id) = self.graph.sheet_id(sheet) {
718            // If a formula exists at this address, return graph value
719            let coord = Coord::new(row, col, true, true);
720            let addr = CellRef::new(sheet_id, coord);
721            if let Some(vid) = self.graph.get_vertex_id_for_address(&addr) {
722                match self.graph.get_vertex_kind(*vid) {
723                    VertexKind::FormulaScalar | VertexKind::FormulaArray => {
724                        return self.graph.get_cell_value(sheet, row, col);
725                    }
726                    _ => {}
727                }
728            }
729        }
730        if let Some(asheet) = self.sheet_store().sheet(sheet) {
731            let r0 = row.saturating_sub(1) as usize;
732            let c0 = col.saturating_sub(1) as usize;
733            let av = asheet.range_view(r0, c0, r0, c0);
734            let v = av.get_cell(0, 0);
735            return Some(v);
736        }
737        self.graph.get_cell_value(sheet, row, col)
738    }
739
740    /// Get formula AST (if any) and current stored value for a cell
741    pub fn get_cell(
742        &self,
743        sheet: &str,
744        row: u32,
745        col: u32,
746    ) -> Option<(Option<formualizer_parse::ASTNode>, Option<LiteralValue>)> {
747        let v = self.get_cell_value(sheet, row, col);
748        let sheet_id = self.graph.sheet_id(sheet)?;
749        let coord = Coord::new(row, col, true, true);
750        let cell = CellRef::new(sheet_id, coord);
751        let vid = self.graph.get_vertex_for_cell(&cell)?;
752        let ast = self.graph.get_formula(vid);
753        Some((ast, v))
754    }
755
756    /// Begin batch operations - defer CSR rebuilds for better performance
757    pub fn begin_batch(&mut self) {
758        self.graph.begin_batch();
759    }
760
761    /// End batch operations and trigger CSR rebuild
762    pub fn end_batch(&mut self) {
763        self.graph.end_batch();
764    }
765
766    /// Evaluate a single vertex.
767    /// This is the core of the sequential evaluation logic for Milestone 3.1.
768    pub fn evaluate_vertex(&mut self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
769        // Check if vertex exists
770        if !self.graph.vertex_exists(vertex_id) {
771            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
772                .with_message(format!("Vertex not found: {vertex_id:?}")));
773        }
774
775        // Get vertex kind and check if it needs evaluation
776        let kind = self.graph.get_vertex_kind(vertex_id);
777        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
778
779        let ast = match kind {
780            VertexKind::FormulaScalar => {
781                // Get the formula AST
782                if let Some(ast) = self.graph.get_formula(vertex_id) {
783                    ast.clone()
784                } else {
785                    return Ok(LiteralValue::Int(0));
786                }
787            }
788            VertexKind::Empty | VertexKind::Cell => {
789                // Check if there's a value stored
790                if let Some(value) = self.graph.get_value(vertex_id) {
791                    return Ok(value.clone());
792                } else {
793                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
794                }
795            }
796            VertexKind::FormulaArray => {
797                if let Some(ast) = self.graph.get_formula(vertex_id) {
798                    ast.clone()
799                } else {
800                    return Ok(LiteralValue::Int(0));
801                }
802            }
803            VertexKind::InfiniteRange | VertexKind::Range | VertexKind::External => {
804                // Not directly evaluatable here; return stored or 0
805                if let Some(value) = self.graph.get_value(vertex_id) {
806                    return Ok(value.clone());
807                } else {
808                    return Ok(LiteralValue::Int(0));
809                }
810            }
811        };
812
813        // The interpreter uses a reference to the engine as the context.
814        let sheet_name = self.graph.sheet_name(sheet_id);
815        let cell_ref = self
816            .graph
817            .get_cell_ref(vertex_id)
818            .expect("cell ref for vertex");
819        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
820        let result = interpreter.evaluate_ast(&ast);
821
822        // If array result, perform spill from the anchor cell
823        match result {
824            Ok(LiteralValue::Array(rows)) => {
825                // Update kind to FormulaArray for tracking
826                self.graph
827                    .set_kind(vertex_id, crate::engine::vertex::VertexKind::FormulaArray);
828                // Build target cells rectangle starting from anchor
829                let anchor = self
830                    .graph
831                    .get_cell_ref(vertex_id)
832                    .expect("cell ref for vertex");
833                let sheet_id = anchor.sheet_id;
834                let h = rows.len() as u32;
835                let w = rows.first().map(|r| r.len()).unwrap_or(0) as u32;
836                // Bounds check to avoid out-of-range writes (align to PackedCoord capacity)
837                const PACKED_MAX_ROW: u32 = 1_048_575; // 20-bit max
838                const PACKED_MAX_COL: u32 = 16_383; // 14-bit max
839                let end_row = anchor.coord.row.saturating_add(h).saturating_sub(1);
840                let end_col = anchor.coord.col.saturating_add(w).saturating_sub(1);
841                if end_row > PACKED_MAX_ROW || end_col > PACKED_MAX_COL {
842                    let spill_err = ExcelError::new(ExcelErrorKind::Spill)
843                        .with_message("Spill exceeds sheet bounds")
844                        .with_extra(formualizer_common::ExcelErrorExtra::Spill {
845                            expected_rows: h,
846                            expected_cols: w,
847                        });
848                    let spill_val = LiteralValue::Error(spill_err.clone());
849                    self.graph.update_vertex_value(vertex_id, spill_val.clone());
850                    return Ok(spill_val);
851                }
852                let mut targets = Vec::new();
853                for r in 0..h {
854                    for c in 0..w {
855                        targets.push(self.graph.make_cell_ref_internal(
856                            sheet_id,
857                            anchor.coord.row + r,
858                            anchor.coord.col + c,
859                        ));
860                    }
861                }
862
863                // Plan spill via spill manager shim
864                match self.spill_mgr.reserve(
865                    vertex_id,
866                    anchor,
867                    SpillShape { rows: h, cols: w },
868                    SpillMeta {
869                        epoch: self.recalc_epoch,
870                        config: self.config.spill,
871                    },
872                ) {
873                    Ok(()) => {
874                        // Commit: write values to grid
875                        // Default conflict policy is Error + FirstWins; reserve() enforces in-flight locks
876                        // and plan_spill_region enforces overlap with committed formulas/spills/values.
877                        if let Err(e) =
878                            self.commit_spill_and_mirror(vertex_id, &targets, rows.clone())
879                        {
880                            // If commit fails, mark as error
881                            self.graph
882                                .update_vertex_value(vertex_id, LiteralValue::Error(e.clone()));
883                            return Ok(LiteralValue::Error(e));
884                        }
885                        // Anchor shows the top-left value, like Excel
886                        let top_left = rows
887                            .first()
888                            .and_then(|r| r.first())
889                            .cloned()
890                            .unwrap_or(LiteralValue::Empty);
891                        self.graph.update_vertex_value(vertex_id, top_left.clone());
892                        Ok(top_left)
893                    }
894                    Err(e) => {
895                        let spill_err = ExcelError::new(ExcelErrorKind::Spill)
896                            .with_message(e.message.unwrap_or_else(|| "Spill blocked".to_string()))
897                            .with_extra(formualizer_common::ExcelErrorExtra::Spill {
898                                expected_rows: h,
899                                expected_cols: w,
900                            });
901                        let spill_val = LiteralValue::Error(spill_err.clone());
902                        self.graph.update_vertex_value(vertex_id, spill_val.clone());
903                        Ok(spill_val)
904                    }
905                }
906            }
907            Ok(other) => {
908                // Scalar result: store value and ensure any previous spill is cleared
909                self.graph.clear_spill_region(vertex_id);
910                self.graph.update_vertex_value(vertex_id, other.clone());
911                // Optionally mirror into Arrow overlay for Arrow-backed reads
912                if self.config.arrow_storage_enabled
913                    && self.config.delta_overlay_enabled
914                    && self.config.write_formula_overlay_enabled
915                {
916                    let anchor = self
917                        .graph
918                        .get_cell_ref(vertex_id)
919                        .expect("cell ref for vertex");
920                    let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
921                    self.mirror_value_to_overlay(
922                        &sheet_name,
923                        anchor.coord.row,
924                        anchor.coord.col,
925                        &other,
926                    );
927                }
928                Ok(other)
929            }
930            Err(e) => {
931                // Runtime Excel error: store as a cell value instead of propagating
932                // as an exception so bulk eval paths don't fail the whole pass.
933                self.graph.clear_spill_region(vertex_id);
934                let err_val = LiteralValue::Error(e.clone());
935                self.graph.update_vertex_value(vertex_id, err_val.clone());
936                if self.config.arrow_storage_enabled
937                    && self.config.delta_overlay_enabled
938                    && self.config.write_formula_overlay_enabled
939                {
940                    let anchor = self
941                        .graph
942                        .get_cell_ref(vertex_id)
943                        .expect("cell ref for vertex");
944                    let sheet_name = self.graph.sheet_name(anchor.sheet_id).to_string();
945                    self.mirror_value_to_overlay(
946                        &sheet_name,
947                        anchor.coord.row,
948                        anchor.coord.col,
949                        &err_val,
950                    );
951                }
952                Ok(err_val)
953            }
954        }
955    }
956
957    /// Evaluate only the necessary precedents for specific target cells (demand-driven)
958    pub fn evaluate_until(&mut self, targets: &[&str]) -> Result<EvalResult, ExcelError> {
959        #[cfg(feature = "tracing")]
960        let _span_eval = tracing::info_span!("evaluate_until", targets = targets.len()).entered();
961        let start = std::time::Instant::now();
962
963        // Parse target cell addresses
964        let mut target_addrs = Vec::new();
965        for target in targets {
966            // For now, assume simple A1-style references on default sheet
967            // TODO: Parse complex references with sheets
968            let (sheet, row, col) = Self::parse_a1_notation(target)?;
969            let sheet_id = self.graph.sheet_id_mut(&sheet);
970            let coord = Coord::new(row, col, true, true);
971            target_addrs.push(CellRef::new(sheet_id, coord));
972        }
973
974        // Find vertex IDs for targets
975        let mut target_vertex_ids = Vec::new();
976        for addr in &target_addrs {
977            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
978                target_vertex_ids.push(*vertex_id);
979            }
980        }
981
982        if target_vertex_ids.is_empty() {
983            return Ok(EvalResult {
984                computed_vertices: 0,
985                cycle_errors: 0,
986                elapsed: start.elapsed(),
987            });
988        }
989
990        // Phase 1: Global warmup planning (no-op by default)
991        if self.config.warmup.warmup_enabled {
992            let mut pass_ctx = PassContext::new(&self.config.warmup);
993            let planner = PassPlanner::new(self.config.warmup.clone());
994
995            // Collect ASTs from target vertices for analysis
996            let mut target_asts = Vec::new();
997            for &vid in &target_vertex_ids {
998                if let Some(ast) = self.graph.get_formula(vid) {
999                    target_asts.push(ast);
1000                }
1001            }
1002
1003            // Analyze and plan warmup
1004            let target_refs: Vec<&ASTNode> = target_asts.iter().collect();
1005            let plan = planner.analyze_targets(&target_refs);
1006
1007            // Execute warmup
1008            let executor = WarmupExecutor::new(self.config.warmup.clone());
1009            let fctx = crate::traits::DefaultFunctionContext::new(self, None);
1010            let _ = executor.execute(&plan, &mut pass_ctx, &fctx);
1011
1012            // Store pass context for use during evaluation (read-only)
1013            self.warmup_pass_ctx = Some(pass_ctx);
1014        }
1015
1016        // Build demand subgraph with virtual edges for compressed ranges
1017        #[cfg(feature = "tracing")]
1018        let _span_sub = tracing::info_span!("demand_subgraph_build").entered();
1019        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
1020        #[cfg(feature = "tracing")]
1021        drop(_span_sub);
1022
1023        if precedents_to_eval.is_empty() {
1024            return Ok(EvalResult {
1025                computed_vertices: 0,
1026                cycle_errors: 0,
1027                elapsed: start.elapsed(),
1028            });
1029        }
1030
1031        // Create schedule for the minimal subgraph, honoring virtual edges
1032        let scheduler = Scheduler::new(&self.graph);
1033        #[cfg(feature = "tracing")]
1034        let _span_sched =
1035            tracing::info_span!("schedule_build", vertices = precedents_to_eval.len()).entered();
1036        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
1037        #[cfg(feature = "tracing")]
1038        drop(_span_sched);
1039
1040        // Handle cycles first
1041        let mut cycle_errors = 0;
1042        for cycle in &schedule.cycles {
1043            cycle_errors += 1;
1044            let circ_error = LiteralValue::Error(
1045                ExcelError::new(ExcelErrorKind::Circ)
1046                    .with_message("Circular dependency detected".to_string()),
1047            );
1048            for &vertex_id in cycle {
1049                self.graph
1050                    .update_vertex_value(vertex_id, circ_error.clone());
1051            }
1052        }
1053
1054        // Evaluate layers (parallel when enabled, mirroring evaluate_all)
1055        let mut computed_vertices = 0;
1056        for layer in &schedule.layers {
1057            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1058                computed_vertices += self.evaluate_layer_parallel(layer)?;
1059            } else {
1060                computed_vertices += self.evaluate_layer_sequential(layer)?;
1061            }
1062        }
1063
1064        // Clear warmup context at end of evaluation
1065        self.warmup_pass_ctx = None;
1066
1067        // Clear dirty flags for evaluated vertices
1068        self.graph.clear_dirty_flags(&precedents_to_eval);
1069
1070        // Re-dirty volatile vertices
1071        self.graph.redirty_volatiles();
1072
1073        Ok(EvalResult {
1074            computed_vertices,
1075            cycle_errors,
1076            elapsed: start.elapsed(),
1077        })
1078    }
1079
1080    /// Evaluate all dirty/volatile vertices
1081    pub fn evaluate_all(&mut self) -> Result<EvalResult, ExcelError> {
1082        if self.config.defer_graph_building {
1083            // Build graph for all staged formulas before evaluating
1084            self.build_graph_all()?;
1085        }
1086        #[cfg(feature = "tracing")]
1087        let _span_eval = tracing::info_span!("evaluate_all").entered();
1088        let start = std::time::Instant::now();
1089        let mut computed_vertices = 0;
1090        let mut cycle_errors = 0;
1091
1092        let to_evaluate = self.graph.get_evaluation_vertices();
1093        if to_evaluate.is_empty() {
1094            return Ok(EvalResult {
1095                computed_vertices,
1096                cycle_errors,
1097                elapsed: start.elapsed(),
1098            });
1099        }
1100
1101        let scheduler = Scheduler::new(&self.graph);
1102        let schedule = scheduler.create_schedule(&to_evaluate)?;
1103
1104        // Handle cycles first by marking them with #CIRC!
1105        for cycle in &schedule.cycles {
1106            cycle_errors += 1;
1107            let circ_error = LiteralValue::Error(
1108                ExcelError::new(ExcelErrorKind::Circ)
1109                    .with_message("Circular dependency detected".to_string()),
1110            );
1111            for &vertex_id in cycle {
1112                self.graph
1113                    .update_vertex_value(vertex_id, circ_error.clone());
1114            }
1115        }
1116
1117        // Evaluate acyclic layers (parallel or sequential based on config)
1118        for layer in &schedule.layers {
1119            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1120                computed_vertices += self.evaluate_layer_parallel(layer)?;
1121            } else {
1122                computed_vertices += self.evaluate_layer_sequential(layer)?;
1123            }
1124        }
1125
1126        // Clear dirty flags for all evaluated vertices (including cycles)
1127        self.graph.clear_dirty_flags(&to_evaluate);
1128
1129        // Re-dirty volatile vertices for the next evaluation cycle
1130        self.graph.redirty_volatiles();
1131
1132        // Advance recalc epoch after a full evaluation pass finishes
1133        self.recalc_epoch = self.recalc_epoch.wrapping_add(1);
1134
1135        Ok(EvalResult {
1136            computed_vertices,
1137            cycle_errors,
1138            elapsed: start.elapsed(),
1139        })
1140    }
1141
1142    /// Convenience: demand-driven evaluation of a single cell by sheet name and row/col.
1143    ///
1144    /// This will evaluate only the minimal set of dirty / volatile precedents required
1145    /// to bring the target cell up-to-date (as if a user asked for that single value),
1146    /// rather than scheduling a full workbook recalc. If the cell is already clean and
1147    /// non-volatile, no vertices will be recomputed.
1148    ///
1149    /// Returns the (possibly newly computed) value stored for the cell afterwards.
1150    /// Empty cells return None. Errors are surfaced via the Result type.
1151    pub fn evaluate_cell(
1152        &mut self,
1153        sheet: &str,
1154        row: u32,
1155        col: u32,
1156    ) -> Result<Option<LiteralValue>, ExcelError> {
1157        if self.config.defer_graph_building {
1158            self.build_graph_for_sheets(std::iter::once(sheet))?;
1159        }
1160        let addr = format!("{}!{}{}", sheet, Self::col_to_letters(col), row);
1161        let _ = self.evaluate_until(&[addr.as_str()])?; // ignore detailed EvalResult here
1162        Ok(self.get_cell_value(sheet, row, col))
1163    }
1164
1165    /// Convenience: demand-driven evaluation of multiple cells; accepts a slice of
1166    /// (sheet, row, col) triples. The union of required dirty / volatile precedents
1167    /// is computed once and evaluated, which is typically faster than calling
1168    /// `evaluate_cell` repeatedly for a related set of targets.
1169    ///
1170    /// Returns the resulting values for each requested target in the same order.
1171    pub fn evaluate_cells(
1172        &mut self,
1173        targets: &[(&str, u32, u32)],
1174    ) -> Result<Vec<Option<LiteralValue>>, ExcelError> {
1175        if targets.is_empty() {
1176            return Ok(Vec::new());
1177        }
1178        if self.config.defer_graph_building {
1179            let mut sheets: rustc_hash::FxHashSet<&str> = rustc_hash::FxHashSet::default();
1180            for (s, _, _) in targets.iter() {
1181                sheets.insert(*s);
1182            }
1183            self.build_graph_for_sheets(sheets.iter().cloned())?;
1184        }
1185        let addresses: Vec<String> = targets
1186            .iter()
1187            .map(|(s, r, c)| format!("{}!{}{}", s, Self::col_to_letters(*c), r))
1188            .collect();
1189        let addr_refs: Vec<&str> = addresses.iter().map(|s| s.as_str()).collect();
1190        let _ = self.evaluate_until(&addr_refs)?;
1191        Ok(targets
1192            .iter()
1193            .map(|(s, r, c)| self.get_cell_value(s, *r, *c))
1194            .collect())
1195    }
1196
1197    /// Get the evaluation plan for target cells without actually evaluating them
1198    pub fn get_eval_plan(&self, targets: &[(&str, u32, u32)]) -> Result<EvalPlan, ExcelError> {
1199        if targets.is_empty() {
1200            return Ok(EvalPlan {
1201                total_vertices_to_evaluate: 0,
1202                layers: Vec::new(),
1203                cycles_detected: 0,
1204                dirty_count: 0,
1205                volatile_count: 0,
1206                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1207                estimated_parallel_layers: 0,
1208                target_cells: Vec::new(),
1209            });
1210        }
1211        if self.config.defer_graph_building {
1212            return Err(ExcelError::new(ExcelErrorKind::Value).with_message(
1213                "Evaluation plan requested with deferred graph; build first or call evaluate_*",
1214            ));
1215        }
1216
1217        // Convert targets to A1 notation for consistency
1218        let addresses: Vec<String> = targets
1219            .iter()
1220            .map(|(s, r, c)| format!("{}!{}{}", s, Self::col_to_letters(*c), r))
1221            .collect();
1222
1223        // Parse target cell addresses
1224        let mut target_addrs = Vec::new();
1225        for (sheet, row, col) in targets {
1226            if let Some(sheet_id) = self.graph.sheet_id(sheet) {
1227                let coord = Coord::new(*row, *col, true, true);
1228                target_addrs.push(CellRef::new(sheet_id, coord));
1229            }
1230        }
1231
1232        // Find vertex IDs for targets
1233        let mut target_vertex_ids = Vec::new();
1234        for addr in &target_addrs {
1235            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
1236                target_vertex_ids.push(*vertex_id);
1237            }
1238        }
1239
1240        if target_vertex_ids.is_empty() {
1241            return Ok(EvalPlan {
1242                total_vertices_to_evaluate: 0,
1243                layers: Vec::new(),
1244                cycles_detected: 0,
1245                dirty_count: 0,
1246                volatile_count: 0,
1247                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1248                estimated_parallel_layers: 0,
1249                target_cells: addresses,
1250            });
1251        }
1252
1253        // Build demand subgraph with virtual edges (same as evaluate_until)
1254        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
1255
1256        if precedents_to_eval.is_empty() {
1257            return Ok(EvalPlan {
1258                total_vertices_to_evaluate: 0,
1259                layers: Vec::new(),
1260                cycles_detected: 0,
1261                dirty_count: 0,
1262                volatile_count: 0,
1263                parallel_enabled: self.config.enable_parallel && self.thread_pool.is_some(),
1264                estimated_parallel_layers: 0,
1265                target_cells: addresses,
1266            });
1267        }
1268
1269        // Count dirty and volatile vertices
1270        let mut dirty_count = 0;
1271        let mut volatile_count = 0;
1272        for &vertex_id in &precedents_to_eval {
1273            if self.graph.is_dirty(vertex_id) {
1274                dirty_count += 1;
1275            }
1276            if self.graph.is_volatile(vertex_id) {
1277                volatile_count += 1;
1278            }
1279        }
1280
1281        // Create schedule for the minimal subgraph honoring virtual edges
1282        let scheduler = Scheduler::new(&self.graph);
1283        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
1284
1285        // Build layer information
1286        let mut layers = Vec::new();
1287        let mut estimated_parallel_layers = 0;
1288        let parallel_enabled = self.config.enable_parallel && self.thread_pool.is_some();
1289
1290        for layer in &schedule.layers {
1291            let parallel_eligible = parallel_enabled && layer.vertices.len() > 1;
1292            if parallel_eligible {
1293                estimated_parallel_layers += 1;
1294            }
1295
1296            // Get sample cell addresses (up to 5)
1297            let sample_cells: Vec<String> = layer
1298                .vertices
1299                .iter()
1300                .take(5)
1301                .filter_map(|&vertex_id| {
1302                    self.graph
1303                        .get_cell_ref_for_vertex(vertex_id)
1304                        .map(|cell_ref| {
1305                            let sheet_name = self.graph.sheet_name(cell_ref.sheet_id);
1306                            format!(
1307                                "{}!{}{}",
1308                                sheet_name,
1309                                Self::col_to_letters(cell_ref.coord.col),
1310                                cell_ref.coord.row
1311                            )
1312                        })
1313                })
1314                .collect();
1315
1316            layers.push(LayerInfo {
1317                vertex_count: layer.vertices.len(),
1318                parallel_eligible,
1319                sample_cells,
1320            });
1321        }
1322
1323        Ok(EvalPlan {
1324            total_vertices_to_evaluate: precedents_to_eval.len(),
1325            layers,
1326            cycles_detected: schedule.cycles.len(),
1327            dirty_count,
1328            volatile_count,
1329            parallel_enabled,
1330            estimated_parallel_layers,
1331            target_cells: addresses,
1332        })
1333    }
1334
1335    /// Build a demand-driven subgraph for the given targets, including ephemeral edges for
1336    /// compressed ranges, and returning the set of dirty/volatile precedents and virtual deps.
1337    fn build_demand_subgraph(
1338        &self,
1339        target_vertices: &[VertexId],
1340    ) -> (
1341        Vec<VertexId>,
1342        rustc_hash::FxHashMap<VertexId, Vec<VertexId>>,
1343    ) {
1344        #[cfg(feature = "tracing")]
1345        let _span =
1346            tracing::info_span!("demand_subgraph", targets = target_vertices.len()).entered();
1347        use formualizer_parse::parser::ReferenceType;
1348        use rustc_hash::{FxHashMap, FxHashSet};
1349
1350        let mut to_evaluate: FxHashSet<VertexId> = FxHashSet::default();
1351        let mut visited: FxHashSet<VertexId> = FxHashSet::default();
1352        let mut stack: Vec<VertexId> = Vec::new();
1353        let mut vdeps: FxHashMap<VertexId, Vec<VertexId>> = FxHashMap::default(); // incoming deps per vertex
1354
1355        for &t in target_vertices {
1356            stack.push(t);
1357        }
1358
1359        while let Some(v) = stack.pop() {
1360            if !visited.insert(v) {
1361                continue;
1362            }
1363            if !self.graph.vertex_exists(v) {
1364                continue;
1365            }
1366            // Only schedule dirty/volatile formulas
1367            match self.graph.get_vertex_kind(v) {
1368                VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1369                    if self.graph.is_dirty(v) || self.graph.is_volatile(v) {
1370                        to_evaluate.insert(v);
1371                    }
1372                }
1373                _ => {}
1374            }
1375
1376            // Explicit dependencies (graph edges)
1377            for dep in self.graph.get_dependencies(v) {
1378                if self.graph.vertex_exists(dep) {
1379                    match self.graph.get_vertex_kind(dep) {
1380                        VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1381                            if !visited.contains(&dep) {
1382                                stack.push(dep);
1383                            }
1384                        }
1385                        _ => {}
1386                    }
1387                }
1388            }
1389
1390            // Compressed range dependencies → discover formula precedents in used/bounded window
1391            if let Some(ranges) = self.graph.get_range_dependencies(v) {
1392                let current_sheet_id = self.graph.get_vertex_sheet_id(v);
1393                for r in ranges {
1394                    if let ReferenceType::Range {
1395                        sheet,
1396                        start_row,
1397                        start_col,
1398                        end_row,
1399                        end_col,
1400                    } = r
1401                    {
1402                        let sheet_id = sheet
1403                            .as_ref()
1404                            .and_then(|name| self.graph.sheet_id(name))
1405                            .unwrap_or(current_sheet_id);
1406                        let sheet_name = self.graph.sheet_name(sheet_id);
1407
1408                        // Infer bounds like resolve_range_view
1409                        let mut sr = *start_row;
1410                        let mut sc = *start_col;
1411                        let mut er = *end_row;
1412                        let mut ec = *end_col;
1413
1414                        if sr.is_none() && er.is_none() {
1415                            let scv = sc.unwrap_or(1);
1416                            let ecv = ec.unwrap_or(scv);
1417                            if let Some((min_r, max_r)) =
1418                                self.graph.used_row_bounds_for_columns(sheet_id, scv, ecv)
1419                            {
1420                                sr = Some(min_r);
1421                                er = Some(max_r);
1422                            } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
1423                                sr = Some(1);
1424                                er = Some(max_rows);
1425                            }
1426                        }
1427                        if sc.is_none() && ec.is_none() {
1428                            let srv = sr.unwrap_or(1);
1429                            let erv = er.unwrap_or(srv);
1430                            if let Some((min_c, max_c)) =
1431                                self.graph.used_col_bounds_for_rows(sheet_id, srv, erv)
1432                            {
1433                                sc = Some(min_c);
1434                                ec = Some(max_c);
1435                            } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
1436                                sc = Some(1);
1437                                ec = Some(max_cols);
1438                            }
1439                        }
1440                        if sr.is_some() && er.is_none() {
1441                            let scv = sc.unwrap_or(1);
1442                            let ecv = ec.unwrap_or(scv);
1443                            if let Some((_, max_r)) =
1444                                self.graph.used_row_bounds_for_columns(sheet_id, scv, ecv)
1445                            {
1446                                er = Some(max_r);
1447                            } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
1448                                er = Some(max_rows);
1449                            }
1450                        }
1451                        if er.is_some() && sr.is_none() {
1452                            let scv = sc.unwrap_or(1);
1453                            let ecv = ec.unwrap_or(scv);
1454                            if let Some((min_r, _)) =
1455                                self.graph.used_row_bounds_for_columns(sheet_id, scv, ecv)
1456                            {
1457                                sr = Some(min_r);
1458                            } else {
1459                                sr = Some(1);
1460                            }
1461                        }
1462                        if sc.is_some() && ec.is_none() {
1463                            let srv = sr.unwrap_or(1);
1464                            let erv = er.unwrap_or(srv);
1465                            if let Some((_, max_c)) =
1466                                self.graph.used_col_bounds_for_rows(sheet_id, srv, erv)
1467                            {
1468                                ec = Some(max_c);
1469                            } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
1470                                ec = Some(max_cols);
1471                            }
1472                        }
1473                        if ec.is_some() && sc.is_none() {
1474                            let srv = sr.unwrap_or(1);
1475                            let erv = er.unwrap_or(srv);
1476                            if let Some((min_c, _)) =
1477                                self.graph.used_col_bounds_for_rows(sheet_id, srv, erv)
1478                            {
1479                                sc = Some(min_c);
1480                            } else {
1481                                sc = Some(1);
1482                            }
1483                        }
1484
1485                        let sr = sr.unwrap_or(1);
1486                        let sc = sc.unwrap_or(1);
1487                        let er = er.unwrap_or(sr.saturating_sub(1));
1488                        let ec = ec.unwrap_or(sc.saturating_sub(1));
1489                        if er < sr || ec < sc {
1490                            continue;
1491                        }
1492
1493                        if let Some(index) = self.graph.sheet_index(sheet_id) {
1494                            // enumerate vertices in col range, filter row and kind
1495                            for u in index.vertices_in_col_range(sc, ec) {
1496                                let pc = self.graph.vertex_coord(u);
1497                                let row = pc.row();
1498                                if row < sr || row > er {
1499                                    continue;
1500                                }
1501                                match self.graph.get_vertex_kind(u) {
1502                                    VertexKind::FormulaScalar | VertexKind::FormulaArray => {
1503                                        // only link and schedule if producer is dirty/volatile
1504                                        if (self.graph.is_dirty(u) || self.graph.is_volatile(u))
1505                                            && u != v
1506                                        {
1507                                            vdeps.entry(v).or_default().push(u);
1508                                            if !visited.contains(&u) {
1509                                                stack.push(u);
1510                                            }
1511                                        }
1512                                    }
1513                                    _ => {}
1514                                }
1515                            }
1516                        }
1517                    }
1518                }
1519            }
1520        }
1521
1522        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
1523        result.sort_unstable();
1524        // Dedup virtual deps
1525        for deps in vdeps.values_mut() {
1526            deps.sort_unstable();
1527            deps.dedup();
1528        }
1529        (result, vdeps)
1530    }
1531
1532    /// Helper: convert 1-based column index to Excel-style letters (1 -> A, 27 -> AA)
1533    fn col_to_letters(mut col: u32) -> String {
1534        let mut s = String::new();
1535        while col > 0 {
1536            let rem = ((col - 1) % 26) as u8;
1537            s.push((b'A' + rem) as char);
1538            col = (col - 1) / 26;
1539        }
1540        s.chars().rev().collect()
1541    }
1542
1543    /// Evaluate all dirty/volatile vertices with cancellation support
1544    pub fn evaluate_all_cancellable(
1545        &mut self,
1546        cancel_flag: &AtomicBool,
1547    ) -> Result<EvalResult, ExcelError> {
1548        let start = std::time::Instant::now();
1549        let mut computed_vertices = 0;
1550        let mut cycle_errors = 0;
1551
1552        let to_evaluate = self.graph.get_evaluation_vertices();
1553        if to_evaluate.is_empty() {
1554            return Ok(EvalResult {
1555                computed_vertices,
1556                cycle_errors,
1557                elapsed: start.elapsed(),
1558            });
1559        }
1560
1561        let scheduler = Scheduler::new(&self.graph);
1562        let schedule = scheduler.create_schedule(&to_evaluate)?;
1563
1564        // Handle cycles first by marking them with #CIRC!
1565        for cycle in &schedule.cycles {
1566            // Check cancellation between cycles
1567            if cancel_flag.load(Ordering::Relaxed) {
1568                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1569                    .with_message("Evaluation cancelled during cycle handling".to_string()));
1570            }
1571
1572            cycle_errors += 1;
1573            let circ_error = LiteralValue::Error(
1574                ExcelError::new(ExcelErrorKind::Circ)
1575                    .with_message("Circular dependency detected".to_string()),
1576            );
1577            for &vertex_id in cycle {
1578                self.graph
1579                    .update_vertex_value(vertex_id, circ_error.clone());
1580            }
1581        }
1582
1583        // Evaluate acyclic layers sequentially with cancellation checks
1584        for layer in &schedule.layers {
1585            // Check cancellation between layers
1586            if cancel_flag.load(Ordering::Relaxed) {
1587                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1588                    .with_message("Evaluation cancelled between layers".to_string()));
1589            }
1590
1591            // Evaluate vertices in this layer (parallel or sequential)
1592            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1593                computed_vertices +=
1594                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
1595            } else {
1596                computed_vertices +=
1597                    self.evaluate_layer_sequential_cancellable(layer, cancel_flag)?;
1598            }
1599        }
1600
1601        // Clear dirty flags for all evaluated vertices (including cycles)
1602        self.graph.clear_dirty_flags(&to_evaluate);
1603
1604        // Re-dirty volatile vertices for the next evaluation cycle
1605        self.graph.redirty_volatiles();
1606
1607        Ok(EvalResult {
1608            computed_vertices,
1609            cycle_errors,
1610            elapsed: start.elapsed(),
1611        })
1612    }
1613
1614    /// Evaluate only the necessary precedents for specific target cells with cancellation support
1615    pub fn evaluate_until_cancellable(
1616        &mut self,
1617        targets: &[&str],
1618        cancel_flag: &AtomicBool,
1619    ) -> Result<EvalResult, ExcelError> {
1620        let start = std::time::Instant::now();
1621
1622        // Parse target cell addresses
1623        let mut target_addrs = Vec::new();
1624        for target in targets {
1625            let (sheet, row, col) = Self::parse_a1_notation(target)?;
1626            let sheet_id = self.graph.sheet_id_mut(&sheet);
1627            let coord = Coord::new(row, col, true, true);
1628            target_addrs.push(CellRef::new(sheet_id, coord));
1629        }
1630
1631        // Find vertex IDs for targets
1632        let mut target_vertex_ids = Vec::new();
1633        for addr in &target_addrs {
1634            if let Some(vertex_id) = self.graph.get_vertex_id_for_address(addr) {
1635                target_vertex_ids.push(*vertex_id);
1636            }
1637        }
1638
1639        if target_vertex_ids.is_empty() {
1640            return Ok(EvalResult {
1641                computed_vertices: 0,
1642                cycle_errors: 0,
1643                elapsed: start.elapsed(),
1644            });
1645        }
1646
1647        // Build demand subgraph with virtual edges
1648        let (precedents_to_eval, vdeps) = self.build_demand_subgraph(&target_vertex_ids);
1649
1650        if precedents_to_eval.is_empty() {
1651            return Ok(EvalResult {
1652                computed_vertices: 0,
1653                cycle_errors: 0,
1654                elapsed: start.elapsed(),
1655            });
1656        }
1657
1658        // Create schedule honoring virtual edges
1659        let scheduler = Scheduler::new(&self.graph);
1660        let schedule = scheduler.create_schedule_with_virtual(&precedents_to_eval, &vdeps)?;
1661
1662        // Handle cycles first
1663        let mut cycle_errors = 0;
1664        for cycle in &schedule.cycles {
1665            // Check cancellation between cycles
1666            if cancel_flag.load(Ordering::Relaxed) {
1667                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
1668                    "Demand-driven evaluation cancelled during cycle handling".to_string(),
1669                ));
1670            }
1671
1672            cycle_errors += 1;
1673            let circ_error = LiteralValue::Error(
1674                ExcelError::new(ExcelErrorKind::Circ)
1675                    .with_message("Circular dependency detected".to_string()),
1676            );
1677            for &vertex_id in cycle {
1678                self.graph
1679                    .update_vertex_value(vertex_id, circ_error.clone());
1680            }
1681        }
1682
1683        // Evaluate layers with cancellation checks
1684        let mut computed_vertices = 0;
1685        for layer in &schedule.layers {
1686            // Check cancellation between layers
1687            if cancel_flag.load(Ordering::Relaxed) {
1688                return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
1689                    "Demand-driven evaluation cancelled between layers".to_string(),
1690                ));
1691            }
1692
1693            // Evaluate vertices in this layer (parallel or sequential)
1694            if self.thread_pool.is_some() && layer.vertices.len() > 1 {
1695                computed_vertices +=
1696                    self.evaluate_layer_parallel_cancellable(layer, cancel_flag)?;
1697            } else {
1698                computed_vertices +=
1699                    self.evaluate_layer_sequential_cancellable_demand_driven(layer, cancel_flag)?;
1700            }
1701        }
1702
1703        // Clear dirty flags for evaluated vertices
1704        self.graph.clear_dirty_flags(&precedents_to_eval);
1705
1706        // Re-dirty volatile vertices
1707        self.graph.redirty_volatiles();
1708
1709        Ok(EvalResult {
1710            computed_vertices,
1711            cycle_errors,
1712            elapsed: start.elapsed(),
1713        })
1714    }
1715
1716    fn parse_a1_notation(address: &str) -> Result<(String, u32, u32), ExcelError> {
1717        let parts: Vec<&str> = address.split('!').collect();
1718        let (sheet, cell_part) = if parts.len() == 2 {
1719            (parts[0].to_string(), parts[1])
1720        } else {
1721            ("Sheet1".to_string(), address) // Assume default sheet if not specified
1722        };
1723
1724        let mut col_end = 0;
1725        for (i, c) in cell_part.chars().enumerate() {
1726            if c.is_alphabetic() {
1727                col_end = i + 1;
1728            } else {
1729                break;
1730            }
1731        }
1732
1733        let col_str = &cell_part[..col_end];
1734        let row_str = &cell_part[col_end..];
1735
1736        let row = row_str.parse::<u32>().map_err(|_| {
1737            ExcelError::new(ExcelErrorKind::Ref).with_message(format!("Invalid row: {row_str}"))
1738        })?;
1739
1740        let mut col = 0;
1741        for c in col_str.to_uppercase().chars() {
1742            col = col * 26 + (c as u32 - 'A' as u32) + 1; // +1 for 1-based indexing
1743        }
1744
1745        Ok((sheet, row, col))
1746    }
1747
1748    /// Determine volatility using this engine's FunctionProvider, falling back to global registry.
1749    fn is_ast_volatile_with_provider(&self, ast: &ASTNode) -> bool {
1750        use formualizer_parse::parser::ASTNodeType;
1751        match &ast.node_type {
1752            ASTNodeType::Function { name, args, .. } => {
1753                if let Some(func) = self
1754                    .get_function("", name)
1755                    .or_else(|| crate::function_registry::get("", name))
1756                {
1757                    if func.caps().contains(crate::function::FnCaps::VOLATILE) {
1758                        return true;
1759                    }
1760                }
1761                args.iter()
1762                    .any(|arg| self.is_ast_volatile_with_provider(arg))
1763            }
1764            ASTNodeType::BinaryOp { left, right, .. } => {
1765                self.is_ast_volatile_with_provider(left)
1766                    || self.is_ast_volatile_with_provider(right)
1767            }
1768            ASTNodeType::UnaryOp { expr, .. } => self.is_ast_volatile_with_provider(expr),
1769            ASTNodeType::Array(rows) => rows.iter().any(|row| {
1770                row.iter()
1771                    .any(|cell| self.is_ast_volatile_with_provider(cell))
1772            }),
1773            _ => false,
1774        }
1775    }
1776
1777    /// Find dirty precedents that need evaluation for the given target vertices
1778    fn find_dirty_precedents(&self, target_vertices: &[VertexId]) -> Vec<VertexId> {
1779        use rustc_hash::FxHashSet;
1780
1781        let mut to_evaluate = FxHashSet::default();
1782        let mut visited = FxHashSet::default();
1783        let mut stack = Vec::new();
1784
1785        // Start reverse traversal from target vertices
1786        for &target in target_vertices {
1787            stack.push(target);
1788        }
1789
1790        while let Some(vertex_id) = stack.pop() {
1791            if !visited.insert(vertex_id) {
1792                continue; // Already processed
1793            }
1794
1795            if self.graph.vertex_exists(vertex_id) {
1796                // Check if this vertex needs evaluation
1797                let kind = self.graph.get_vertex_kind(vertex_id);
1798                let needs_eval = match kind {
1799                    super::vertex::VertexKind::FormulaScalar
1800                    | super::vertex::VertexKind::FormulaArray => {
1801                        self.graph.is_dirty(vertex_id) || self.graph.is_volatile(vertex_id)
1802                    }
1803                    _ => false, // Values and empty cells don't need evaluation
1804                };
1805
1806                if needs_eval {
1807                    to_evaluate.insert(vertex_id);
1808                }
1809
1810                // Continue traversal to dependencies (precedents)
1811                let dependencies = self.graph.get_dependencies(vertex_id);
1812                for &dep_id in &dependencies {
1813                    if !visited.contains(&dep_id) {
1814                        stack.push(dep_id);
1815                    }
1816                }
1817            }
1818        }
1819
1820        let mut result: Vec<VertexId> = to_evaluate.into_iter().collect();
1821        result.sort_unstable();
1822        result
1823    }
1824
1825    /// Evaluate a layer sequentially
1826    fn evaluate_layer_sequential(
1827        &mut self,
1828        layer: &super::scheduler::Layer,
1829    ) -> Result<usize, ExcelError> {
1830        for &vertex_id in &layer.vertices {
1831            self.evaluate_vertex(vertex_id)?;
1832        }
1833        Ok(layer.vertices.len())
1834    }
1835
1836    /// Evaluate a layer sequentially with cancellation support
1837    fn evaluate_layer_sequential_cancellable(
1838        &mut self,
1839        layer: &super::scheduler::Layer,
1840        cancel_flag: &AtomicBool,
1841    ) -> Result<usize, ExcelError> {
1842        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
1843            // Check cancellation every 256 vertices to balance responsiveness with performance
1844            if i % 256 == 0 && cancel_flag.load(Ordering::Relaxed) {
1845                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1846                    .with_message("Evaluation cancelled within layer".to_string()));
1847            }
1848
1849            self.evaluate_vertex(vertex_id)?;
1850        }
1851        Ok(layer.vertices.len())
1852    }
1853
1854    /// Evaluate a layer sequentially with more frequent cancellation checks for demand-driven evaluation
1855    fn evaluate_layer_sequential_cancellable_demand_driven(
1856        &mut self,
1857        layer: &super::scheduler::Layer,
1858        cancel_flag: &AtomicBool,
1859    ) -> Result<usize, ExcelError> {
1860        for (i, &vertex_id) in layer.vertices.iter().enumerate() {
1861            // Check cancellation more frequently for demand-driven evaluation (every 128 vertices)
1862            if i % 128 == 0 && cancel_flag.load(Ordering::Relaxed) {
1863                return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1864                    .with_message("Demand-driven evaluation cancelled within layer".to_string()));
1865            }
1866
1867            self.evaluate_vertex(vertex_id)?;
1868        }
1869        Ok(layer.vertices.len())
1870    }
1871
1872    /// Evaluate a layer in parallel using the thread pool
1873    fn evaluate_layer_parallel(
1874        &mut self,
1875        layer: &super::scheduler::Layer,
1876    ) -> Result<usize, ExcelError> {
1877        use rayon::prelude::*;
1878
1879        let thread_pool = self.thread_pool.as_ref().unwrap();
1880
1881        // Collect all evaluation results first, then update the graph sequentially
1882        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
1883            thread_pool.install(|| {
1884                layer
1885                    .vertices
1886                    .par_iter()
1887                    .map(
1888                        |&vertex_id| match self.evaluate_vertex_immutable(vertex_id) {
1889                            Ok(v) => Ok((vertex_id, v)),
1890                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
1891                        },
1892                    )
1893                    .collect()
1894            });
1895
1896        // Update the graph with results sequentially (thread-safe)
1897        match results {
1898            Ok(vertex_results) => {
1899                for (vertex_id, result) in vertex_results {
1900                    self.graph.update_vertex_value(vertex_id, result);
1901                }
1902                Ok(layer.vertices.len())
1903            }
1904            Err(e) => Err(e),
1905        }
1906    }
1907
1908    /// Evaluate a layer in parallel with cancellation support
1909    fn evaluate_layer_parallel_cancellable(
1910        &mut self,
1911        layer: &super::scheduler::Layer,
1912        cancel_flag: &AtomicBool,
1913    ) -> Result<usize, ExcelError> {
1914        use rayon::prelude::*;
1915
1916        let thread_pool = self.thread_pool.as_ref().unwrap();
1917
1918        // Check cancellation before starting parallel work
1919        if cancel_flag.load(Ordering::Relaxed) {
1920            return Err(ExcelError::new(ExcelErrorKind::Cancelled)
1921                .with_message("Parallel evaluation cancelled before starting".to_string()));
1922        }
1923
1924        // Collect all evaluation results first, then update the graph sequentially
1925        let results: Result<Vec<(VertexId, LiteralValue)>, ExcelError> =
1926            thread_pool.install(|| {
1927                layer
1928                    .vertices
1929                    .par_iter()
1930                    .map(|&vertex_id| {
1931                        // Check cancellation periodically during parallel work
1932                        if cancel_flag.load(Ordering::Relaxed) {
1933                            return Err(ExcelError::new(ExcelErrorKind::Cancelled).with_message(
1934                                "Parallel evaluation cancelled during execution".to_string(),
1935                            ));
1936                        }
1937                        match self.evaluate_vertex_immutable(vertex_id) {
1938                            Ok(v) => Ok((vertex_id, v)),
1939                            Err(e) => Ok((vertex_id, LiteralValue::Error(e))),
1940                        }
1941                    })
1942                    .collect()
1943            });
1944
1945        // Update the graph with results sequentially (thread-safe)
1946        match results {
1947            Ok(vertex_results) => {
1948                for (vertex_id, result) in vertex_results {
1949                    self.graph.update_vertex_value(vertex_id, result);
1950                }
1951                Ok(layer.vertices.len())
1952            }
1953            Err(e) => Err(e),
1954        }
1955    }
1956
1957    /// Evaluate a single vertex without mutating the graph (for parallel evaluation)
1958    fn evaluate_vertex_immutable(&self, vertex_id: VertexId) -> Result<LiteralValue, ExcelError> {
1959        // Check if vertex exists
1960        if !self.graph.vertex_exists(vertex_id) {
1961            return Err(ExcelError::new(formualizer_common::ExcelErrorKind::Ref)
1962                .with_message(format!("Vertex not found: {vertex_id:?}")));
1963        }
1964
1965        // Get vertex kind and check if it needs evaluation
1966        let kind = self.graph.get_vertex_kind(vertex_id);
1967        let sheet_id = self.graph.get_vertex_sheet_id(vertex_id);
1968
1969        let ast = match kind {
1970            VertexKind::FormulaScalar => {
1971                // Get the formula AST
1972                if let Some(ast) = self.graph.get_formula(vertex_id) {
1973                    ast.clone()
1974                } else {
1975                    return Ok(LiteralValue::Int(0));
1976                }
1977            }
1978            VertexKind::Empty | VertexKind::Cell => {
1979                // Check if there's a value stored
1980                if let Some(value) = self.graph.get_value(vertex_id) {
1981                    return Ok(value.clone());
1982                } else {
1983                    return Ok(LiteralValue::Int(0)); // Empty cells evaluate to 0
1984                }
1985            }
1986            _ => {
1987                return Ok(LiteralValue::Error(
1988                    ExcelError::new(formualizer_common::ExcelErrorKind::Na)
1989                        .with_message("Array formulas not yet supported".to_string()),
1990                ));
1991            }
1992        };
1993
1994        // The interpreter uses a reference to the engine as the context
1995        let sheet_name = self.graph.sheet_name(sheet_id);
1996        let cell_ref = self
1997            .graph
1998            .get_cell_ref(vertex_id)
1999            .expect("cell ref for vertex");
2000        let interpreter = Interpreter::new_with_cell(self, sheet_name, cell_ref);
2001        interpreter.evaluate_ast(&ast)
2002    }
2003
2004    /// Get access to the shared thread pool for parallel evaluation
2005    pub fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
2006        self.thread_pool.as_ref()
2007    }
2008}
2009
2010#[derive(Default)]
2011struct RowBoundsCache {
2012    snapshot: u64,
2013    // key: (sheet_id, col_idx)
2014    map: rustc_hash::FxHashMap<(u32, usize), (Option<u32>, Option<u32>)>,
2015}
2016
2017impl RowBoundsCache {
2018    fn new(snapshot: u64) -> Self {
2019        Self {
2020            snapshot,
2021            map: Default::default(),
2022        }
2023    }
2024    fn get_row_bounds(
2025        &self,
2026        sheet_id: SheetId,
2027        col_idx: usize,
2028        snapshot: u64,
2029    ) -> Option<(Option<u32>, Option<u32>)> {
2030        if self.snapshot != snapshot {
2031            return None;
2032        }
2033        self.map.get(&(sheet_id as u32, col_idx)).copied()
2034    }
2035    fn put_row_bounds(
2036        &mut self,
2037        sheet_id: SheetId,
2038        col_idx: usize,
2039        snapshot: u64,
2040        bounds: (Option<u32>, Option<u32>),
2041    ) {
2042        if self.snapshot != snapshot {
2043            self.snapshot = snapshot;
2044            self.map.clear();
2045        }
2046        self.map.insert((sheet_id as u32, col_idx), bounds);
2047    }
2048}
2049
2050// ───────────── Criteria Mask Cache (Arrow) ─────────────
2051#[derive(Default)]
2052struct MaskCache {
2053    snapshot: u64,
2054    cap: usize,
2055    // Simple FIFO map for now; key -> mask
2056    map: rustc_hash::FxHashMap<String, std::sync::Arc<arrow_array::BooleanArray>>,
2057    order: std::collections::VecDeque<String>,
2058    hits: u64,
2059    misses: u64,
2060}
2061
2062impl MaskCache {
2063    fn new(snapshot: u64, cap: usize) -> Self {
2064        Self {
2065            snapshot,
2066            cap,
2067            map: Default::default(),
2068            order: Default::default(),
2069            hits: 0,
2070            misses: 0,
2071        }
2072    }
2073    fn clear_and_set_snapshot(&mut self, snap: u64) {
2074        self.snapshot = snap;
2075        self.map.clear();
2076        self.order.clear();
2077        self.hits = 0;
2078        self.misses = 0;
2079    }
2080
2081    fn key_for(
2082        &self,
2083        engine: &Engine<impl crate::traits::EvaluationContext>,
2084        view: &crate::arrow_store::ArrowRangeView<'_>,
2085        col_in_view: usize,
2086        pred: &crate::args::CriteriaPredicate,
2087    ) -> Option<String> {
2088        let sheet_id = engine.graph.sheet_id(view.sheet_name())? as u32;
2089        let abs_col = view.start_col().saturating_add(col_in_view);
2090        let sr = view.start_row();
2091        let er = view.end_row();
2092        use crate::args::CriteriaPredicate as CP;
2093        let sig = match pred {
2094            CP::Gt(n) => format!("GT:{n}"),
2095            CP::Ge(n) => format!("GE:{n}"),
2096            CP::Lt(n) => format!("LT:{n}"),
2097            CP::Le(n) => format!("LE:{n}"),
2098            CP::Eq(v) => format!("EQ:{}", Self::fmt_lit(v)),
2099            CP::Ne(v) => format!("NE:{}", Self::fmt_lit(v)),
2100            CP::TextLike {
2101                pattern,
2102                case_insensitive,
2103            } => {
2104                let mut lp = pattern.replace('*', "%").replace('?', "_");
2105                if *case_insensitive {
2106                    lp = lp.to_ascii_lowercase();
2107                }
2108                format!("LIKE:{}:{}", if *case_insensitive { 'i' } else { 's' }, lp)
2109            }
2110            CP::IsBlank | CP::IsNumber | CP::IsText | CP::IsLogical => return None,
2111        };
2112        Some(format!("s{sheet_id}:c{abs_col}:r{sr}-{er}:{sig}"))
2113    }
2114
2115    fn fmt_lit(v: &formualizer_common::LiteralValue) -> String {
2116        use formualizer_common::LiteralValue as LV;
2117        match v {
2118            LV::Number(n) => format!("N:{n}"),
2119            LV::Int(i) => format!("I:{i}"),
2120            LV::Text(s) => format!("T:{}", s.to_ascii_lowercase()),
2121            LV::Boolean(b) => format!("B:{}", if *b { 1 } else { 0 }),
2122            LV::Empty => "E".to_string(),
2123            LV::Error(_) => "ERR".to_string(),
2124            LV::Date(_)
2125            | LV::DateTime(_)
2126            | LV::Time(_)
2127            | LV::Duration(_)
2128            | LV::Array(_)
2129            | LV::Pending => "UNSUP".to_string(),
2130        }
2131    }
2132
2133    fn get_or_compute(
2134        &mut self,
2135        engine: &Engine<impl crate::traits::EvaluationContext>,
2136        view: &crate::arrow_store::ArrowRangeView<'_>,
2137        col_in_view: usize,
2138        pred: &crate::args::CriteriaPredicate,
2139    ) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
2140        let key = self.key_for(engine, view, col_in_view, pred)?;
2141        if let Some(m) = self.map.get(&key) {
2142            self.hits = self.hits.saturating_add(1);
2143            return Some(m.clone());
2144        }
2145        self.misses = self.misses.saturating_add(1);
2146        let mask = Self::compute_mask(engine, view, col_in_view, pred)?;
2147        // Insert with FIFO eviction
2148        if self.map.len() >= self.cap && !self.map.contains_key(&key) {
2149            if let Some(old) = self.order.pop_front() {
2150                self.map.remove(&old);
2151            }
2152        }
2153        self.order.push_back(key.clone());
2154        self.map.insert(key, mask.clone());
2155        Some(mask)
2156    }
2157
2158    fn compute_mask(
2159        engine: &Engine<impl crate::traits::EvaluationContext>,
2160        view: &crate::arrow_store::ArrowRangeView<'_>,
2161        col_in_view: usize,
2162        pred: &crate::args::CriteriaPredicate,
2163    ) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
2164        use crate::compute_prelude::{boolean, cmp, concat_arrays};
2165        use arrow::compute::kernels::comparison::{ilike, nilike};
2166        use arrow_array::{
2167            Array as _, ArrayRef, Float64Array, StringArray, builder::BooleanBuilder,
2168        };
2169
2170        let (rows, _cols) = view.dims();
2171        // Build the criterion column arrays by concatenating slices for the single column
2172        let mut num_parts: Vec<std::sync::Arc<Float64Array>> = Vec::new();
2173        for (_rs, _rl, cols_seg) in view.numbers_slices() {
2174            if col_in_view < cols_seg.len() {
2175                num_parts.push(cols_seg[col_in_view].clone());
2176            }
2177        }
2178        let num_col: Option<std::sync::Arc<Float64Array>> = if num_parts.is_empty() {
2179            None
2180        } else if num_parts.len() == 1 {
2181            Some(num_parts.remove(0))
2182        } else {
2183            let anys: Vec<&dyn arrow_array::Array> = num_parts
2184                .iter()
2185                .map(|a| a.as_ref() as &dyn arrow_array::Array)
2186                .collect();
2187            let conc: ArrayRef = concat_arrays(&anys).ok()?;
2188            let fa = conc.as_any().downcast_ref::<Float64Array>()?.clone();
2189            Some(std::sync::Arc::new(fa))
2190        };
2191
2192        // Lowered text column for this view/column
2193        let lowered_texts: Option<std::sync::Arc<StringArray>> = {
2194            let cols = view.lowered_text_columns();
2195            if col_in_view >= cols.len() {
2196                None
2197            } else {
2198                let sa = cols[col_in_view]
2199                    .as_any()
2200                    .downcast_ref::<StringArray>()?
2201                    .clone();
2202                Some(std::sync::Arc::new(sa))
2203            }
2204        };
2205
2206        let out = match pred {
2207            crate::args::CriteriaPredicate::Gt(n) => {
2208                let col = num_col?;
2209                cmp::gt(col.as_ref(), &Float64Array::new_scalar(*n)).ok()?
2210            }
2211            crate::args::CriteriaPredicate::Ge(n) => {
2212                let col = num_col?;
2213                cmp::gt_eq(col.as_ref(), &Float64Array::new_scalar(*n)).ok()?
2214            }
2215            crate::args::CriteriaPredicate::Lt(n) => {
2216                let col = num_col?;
2217                cmp::lt(col.as_ref(), &Float64Array::new_scalar(*n)).ok()?
2218            }
2219            crate::args::CriteriaPredicate::Le(n) => {
2220                let col = num_col?;
2221                cmp::lt_eq(col.as_ref(), &Float64Array::new_scalar(*n)).ok()?
2222            }
2223            crate::args::CriteriaPredicate::Eq(v) => match v {
2224                formualizer_common::LiteralValue::Number(x) => {
2225                    let col = num_col?;
2226                    cmp::eq(col.as_ref(), &Float64Array::new_scalar(*x)).ok()?
2227                }
2228                formualizer_common::LiteralValue::Int(i) => {
2229                    let col = num_col?;
2230                    cmp::eq(col.as_ref(), &Float64Array::new_scalar(*i as f64)).ok()?
2231                }
2232                formualizer_common::LiteralValue::Text(t) => {
2233                    let col = lowered_texts?;
2234                    let pat = StringArray::new_scalar(t.to_ascii_lowercase());
2235                    let mut m = ilike(col.as_ref(), &pat).ok()?;
2236                    if t.is_empty() {
2237                        // Treat nulls as equal to empty string
2238                        let mut bb = BooleanBuilder::with_capacity(col.len());
2239                        for i in 0..col.len() {
2240                            bb.append_value(col.is_null(i));
2241                        }
2242                        let nulls = bb.finish();
2243                        m = boolean::or_kleene(&m, &nulls).ok()?;
2244                    }
2245                    m
2246                }
2247                _ => return None,
2248            },
2249            crate::args::CriteriaPredicate::Ne(v) => match v {
2250                formualizer_common::LiteralValue::Number(x) => {
2251                    let col = num_col?;
2252                    cmp::neq(col.as_ref(), &Float64Array::new_scalar(*x)).ok()?
2253                }
2254                formualizer_common::LiteralValue::Int(i) => {
2255                    let col = num_col?;
2256                    cmp::neq(col.as_ref(), &Float64Array::new_scalar(*i as f64)).ok()?
2257                }
2258                formualizer_common::LiteralValue::Text(t) => {
2259                    let col = lowered_texts?;
2260                    let pat = StringArray::new_scalar(t.to_ascii_lowercase());
2261                    nilike(col.as_ref(), &pat).ok()?
2262                }
2263                _ => return None,
2264            },
2265            crate::args::CriteriaPredicate::TextLike {
2266                pattern,
2267                case_insensitive,
2268            } => {
2269                let col = lowered_texts?;
2270                let p = if *case_insensitive {
2271                    pattern.to_ascii_lowercase()
2272                } else {
2273                    pattern.clone()
2274                };
2275                let lp = p.replace('*', "%").replace('?', "_");
2276                let pat = StringArray::new_scalar(lp);
2277                ilike(col.as_ref(), &pat).ok()?
2278            }
2279            _ => return None,
2280        };
2281        Some(std::sync::Arc::new(out))
2282    }
2283}
2284
2285#[cfg(test)]
2286impl<R> Engine<R>
2287where
2288    R: EvaluationContext,
2289{
2290    pub fn __mask_cache_stats(&self) -> (u64, u64, usize) {
2291        if let Ok(g) = self.mask_cache.read() {
2292            if let Some(c) = g.as_ref() {
2293                return (c.hits, c.misses, c.map.len());
2294            }
2295        }
2296        (0, 0, 0)
2297    }
2298    pub fn __mask_cache_clear(&self) {
2299        if let Ok(mut g) = self.mask_cache.write() {
2300            *g = None;
2301        }
2302    }
2303    pub fn __mask_cache_set_cap(&mut self, cap: usize) {
2304        if let Ok(mut g) = self.mask_cache.write() {
2305            let snap = self.data_snapshot_id();
2306            *g = Some(MaskCache::new(snap, cap));
2307        }
2308    }
2309}
2310
2311// Phase 2 shim: in-process spill manager delegating to current graph methods.
2312#[derive(Default)]
2313pub struct ShimSpillManager {
2314    region_locks: RegionLockManager,
2315    pub(crate) active_locks: rustc_hash::FxHashMap<VertexId, u64>,
2316}
2317
2318impl ShimSpillManager {
2319    pub(crate) fn reserve(
2320        &mut self,
2321        owner: VertexId,
2322        anchor_cell: CellRef,
2323        shape: SpillShape,
2324        _meta: SpillMeta,
2325    ) -> Result<(), ExcelError> {
2326        // Derive region from anchor + shape; enforce in-flight exclusivity only.
2327        let region = crate::engine::spill::Region {
2328            sheet_id: anchor_cell.sheet_id as u32,
2329            row_start: anchor_cell.coord.row,
2330            row_end: anchor_cell
2331                .coord
2332                .row
2333                .saturating_add(shape.rows)
2334                .saturating_sub(1),
2335            col_start: anchor_cell.coord.col,
2336            col_end: anchor_cell
2337                .coord
2338                .col
2339                .saturating_add(shape.cols)
2340                .saturating_sub(1),
2341        };
2342        match self.region_locks.reserve(region, owner) {
2343            Ok(id) => {
2344                if id != 0 {
2345                    self.active_locks.insert(owner, id);
2346                }
2347                Ok(())
2348            }
2349            Err(e) => Err(e),
2350        }
2351    }
2352
2353    pub(crate) fn commit_array(
2354        &mut self,
2355        graph: &mut DependencyGraph,
2356        anchor_vertex: VertexId,
2357        targets: &[CellRef],
2358        rows: Vec<Vec<LiteralValue>>,
2359    ) -> Result<(), ExcelError> {
2360        // Re-run plan on concrete targets before committing to respect blockers.
2361        let plan_res = graph.plan_spill_region(anchor_vertex, targets);
2362        if let Err(e) = plan_res {
2363            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2364                self.region_locks.release(id);
2365            }
2366            return Err(e);
2367        }
2368
2369        let commit_res = graph.commit_spill_region_atomic_with_fault(
2370            anchor_vertex,
2371            targets.to_vec(),
2372            rows,
2373            None,
2374        );
2375        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2376            self.region_locks.release(id);
2377        }
2378        commit_res.map(|_| ())
2379    }
2380
2381    /// Commit a spill and mirror all written cells into Arrow overlay via the owning engine.
2382    pub(crate) fn commit_array_with_overlay<R: EvaluationContext>(
2383        &mut self,
2384        engine: &mut Engine<R>,
2385        anchor_vertex: VertexId,
2386        targets: &[CellRef],
2387        rows: Vec<Vec<LiteralValue>>,
2388    ) -> Result<(), ExcelError> {
2389        // Re-run plan on concrete targets before committing to respect blockers.
2390        let plan_res = engine.graph.plan_spill_region(anchor_vertex, targets);
2391        if let Err(e) = plan_res {
2392            if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2393                self.region_locks.release(id);
2394            }
2395            return Err(e);
2396        }
2397
2398        let commit_res = engine.graph.commit_spill_region_atomic_with_fault(
2399            anchor_vertex,
2400            targets.to_vec(),
2401            rows.clone(),
2402            None,
2403        );
2404        if let Some(id) = self.active_locks.remove(&anchor_vertex) {
2405            self.region_locks.release(id);
2406        }
2407        commit_res.map(|_| ())?;
2408
2409        // Mirror into Arrow overlay when enabled
2410        if engine.config.arrow_storage_enabled
2411            && engine.config.delta_overlay_enabled
2412            && engine.config.write_formula_overlay_enabled
2413        {
2414            // Expect targets to be a contiguous rectangle row-major starting at some anchor
2415            for (idx, cell) in targets.iter().enumerate() {
2416                let (r_off, c_off) = {
2417                    if rows.is_empty() || rows[0].is_empty() {
2418                        (0usize, 0usize)
2419                    } else {
2420                        let width = rows[0].len();
2421                        (idx / width, idx % width)
2422                    }
2423                };
2424                let v = rows
2425                    .get(r_off)
2426                    .and_then(|r| r.get(c_off))
2427                    .cloned()
2428                    .unwrap_or(LiteralValue::Empty);
2429                let sheet_name = engine.graph.sheet_name(cell.sheet_id).to_string();
2430                engine.mirror_value_to_overlay(&sheet_name, cell.coord.row, cell.coord.col, &v);
2431            }
2432        }
2433        Ok(())
2434    }
2435}
2436
2437// Implement the resolver traits for the Engine.
2438// This allows the interpreter to resolve references by querying the engine's graph.
2439impl<R> crate::traits::ReferenceResolver for Engine<R>
2440where
2441    R: EvaluationContext,
2442{
2443    fn resolve_cell_reference(
2444        &self,
2445        sheet: Option<&str>,
2446        row: u32,
2447        col: u32,
2448    ) -> Result<LiteralValue, ExcelError> {
2449        let sheet_name = sheet.unwrap_or("Sheet1"); // FIXME: This should use the current sheet context
2450        // Prefer engine's unified accessor which consults Arrow store for base values
2451        // and falls back to graph for formulas and stored values.
2452        if let Some(v) = self.get_cell_value(sheet_name, row, col) {
2453            Ok(v)
2454        } else {
2455            // Excel semantics: empty cell coerces to 0 in numeric contexts
2456            Ok(LiteralValue::Int(0))
2457        }
2458    }
2459}
2460
2461impl<R> crate::traits::RangeResolver for Engine<R>
2462where
2463    R: EvaluationContext,
2464{
2465    fn resolve_range_reference(
2466        &self,
2467        sheet: Option<&str>,
2468        sr: Option<u32>,
2469        sc: Option<u32>,
2470        er: Option<u32>,
2471        ec: Option<u32>,
2472    ) -> Result<Box<dyn crate::traits::Range>, ExcelError> {
2473        // For now, delegate range resolution to the external resolver.
2474        // A future optimization could be to handle this within the graph.
2475        self.resolver.resolve_range_reference(sheet, sr, sc, er, ec)
2476    }
2477}
2478
2479impl<R> crate::traits::NamedRangeResolver for Engine<R>
2480where
2481    R: EvaluationContext,
2482{
2483    fn resolve_named_range_reference(
2484        &self,
2485        name: &str,
2486    ) -> Result<Vec<Vec<LiteralValue>>, ExcelError> {
2487        self.resolver.resolve_named_range_reference(name)
2488    }
2489}
2490
2491impl<R> crate::traits::TableResolver for Engine<R>
2492where
2493    R: EvaluationContext,
2494{
2495    fn resolve_table_reference(
2496        &self,
2497        tref: &formualizer_parse::parser::TableReference,
2498    ) -> Result<Box<dyn crate::traits::Table>, ExcelError> {
2499        self.resolver.resolve_table_reference(tref)
2500    }
2501}
2502
2503// The Engine is a Resolver because it implements the constituent traits.
2504impl<R> crate::traits::Resolver for Engine<R> where R: EvaluationContext {}
2505
2506// The Engine provides functions by delegating to its internal resolver.
2507impl<R> crate::traits::FunctionProvider for Engine<R>
2508where
2509    R: EvaluationContext,
2510{
2511    fn get_function(
2512        &self,
2513        prefix: &str,
2514        name: &str,
2515    ) -> Option<std::sync::Arc<dyn crate::function::Function>> {
2516        self.resolver.get_function(prefix, name)
2517    }
2518}
2519
2520// Override EvaluationContext to provide thread pool access
2521impl<R> crate::traits::EvaluationContext for Engine<R>
2522where
2523    R: EvaluationContext,
2524{
2525    fn thread_pool(&self) -> Option<&Arc<rayon::ThreadPool>> {
2526        self.thread_pool.as_ref()
2527    }
2528
2529    fn cancellation_token(&self) -> Option<&std::sync::atomic::AtomicBool> {
2530        // Engine-wide cancellation is exposed via evaluate_all_cancellable APIs; default None here.
2531        None
2532    }
2533
2534    fn chunk_hint(&self) -> Option<usize> {
2535        // Use a simple heuristic from configuration (stripe width * height) as a default hint.
2536        let hint =
2537            (self.config.stripe_height as usize).saturating_mul(self.config.stripe_width as usize);
2538        Some(hint.clamp(1024, 1 << 20)) // clamp between 1K and ~1M
2539    }
2540
2541    fn volatile_level(&self) -> crate::traits::VolatileLevel {
2542        self.config.volatile_level
2543    }
2544
2545    fn workbook_seed(&self) -> u64 {
2546        self.config.workbook_seed
2547    }
2548
2549    fn recalc_epoch(&self) -> u64 {
2550        self.recalc_epoch
2551    }
2552
2553    fn used_rows_for_columns(
2554        &self,
2555        sheet: &str,
2556        start_col: u32,
2557        end_col: u32,
2558    ) -> Option<(u32, u32)> {
2559        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
2560        let sheet_id = self.graph.sheet_id(sheet)?;
2561        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
2562        if arrow_ok {
2563            if let Some(bounds) = self.arrow_used_row_bounds(sheet, start_col, end_col) {
2564                return Some(bounds);
2565            }
2566        }
2567        self.graph
2568            .used_row_bounds_for_columns(sheet_id, start_col, end_col)
2569    }
2570
2571    fn used_cols_for_rows(&self, sheet: &str, start_row: u32, end_row: u32) -> Option<(u32, u32)> {
2572        // Prefer Arrow-backed used-region; fallback to graph if formulas intersect region
2573        let sheet_id = self.graph.sheet_id(sheet)?;
2574        let arrow_ok = self.sheet_store().sheet(sheet).is_some();
2575        if arrow_ok {
2576            if let Some(bounds) = self.arrow_used_col_bounds(sheet, start_row, end_row) {
2577                return Some(bounds);
2578            }
2579        }
2580        self.graph
2581            .used_col_bounds_for_rows(sheet_id, start_row, end_row)
2582    }
2583
2584    fn sheet_bounds(&self, sheet: &str) -> Option<(u32, u32)> {
2585        let _ = self.graph.sheet_id(sheet)?;
2586        // Excel-like upper bounds; we expose something finite but large.
2587        // Backends may override with real bounds.
2588        Some((1_048_576, 16_384)) // 1048576 rows, 16384 cols (XFD)
2589    }
2590
2591    fn data_snapshot_id(&self) -> u64 {
2592        self.snapshot_id.load(std::sync::atomic::Ordering::Relaxed)
2593    }
2594
2595    fn backend_caps(&self) -> crate::traits::BackendCaps {
2596        crate::traits::BackendCaps {
2597            streaming: true,
2598            used_region: true,
2599            write: false,
2600            tables: false,
2601            async_stream: false,
2602        }
2603    }
2604
2605    // Flats removed
2606
2607    fn arrow_fastpath_enabled(&self) -> bool {
2608        self.config.arrow_fastpath_enabled
2609    }
2610
2611    fn date_system(&self) -> crate::engine::DateSystem {
2612        self.config.date_system
2613    }
2614    /// New: resolve a reference into a RangeView (Phase 2 API)
2615    fn resolve_range_view<'c>(
2616        &'c self,
2617        reference: &ReferenceType,
2618        current_sheet: &str,
2619    ) -> Result<RangeView<'c>, ExcelError> {
2620        match reference {
2621            ReferenceType::Range {
2622                sheet,
2623                start_row,
2624                start_col,
2625                end_row,
2626                end_col,
2627            } => {
2628                let sheet_name = sheet.as_deref().unwrap_or(current_sheet);
2629                let sheet_id = self
2630                    .graph
2631                    .sheet_id(sheet_name)
2632                    .ok_or(ExcelError::new(ExcelErrorKind::Ref))?;
2633
2634                let is_unbounded = start_row.is_none()
2635                    || end_row.is_none()
2636                    || start_col.is_none()
2637                    || end_col.is_none();
2638                let mut sr = *start_row;
2639                let mut sc = *start_col;
2640                let mut er = *end_row;
2641                let mut ec = *end_col;
2642
2643                if sr.is_none() && er.is_none() {
2644                    // Full-column reference: anchor at row 1
2645                    let scv = sc.unwrap_or(1);
2646                    let ecv = ec.unwrap_or(scv);
2647                    sr = Some(1);
2648                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2649                        er = Some(max_r);
2650                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2651                        er = Some(max_rows);
2652                    }
2653                }
2654                if sc.is_none() && ec.is_none() {
2655                    // Full-row reference: anchor at column 1
2656                    let srv = sr.unwrap_or(1);
2657                    let erv = er.unwrap_or(srv);
2658                    sc = Some(1);
2659                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2660                        ec = Some(max_c);
2661                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2662                        ec = Some(max_cols);
2663                    }
2664                }
2665                if sr.is_some() && er.is_none() {
2666                    let scv = sc.unwrap_or(1);
2667                    let ecv = ec.unwrap_or(scv);
2668                    if let Some((_, max_r)) = self.used_rows_for_columns(sheet_name, scv, ecv) {
2669                        er = Some(max_r);
2670                    } else if let Some((max_rows, _)) = self.sheet_bounds(sheet_name) {
2671                        er = Some(max_rows);
2672                    }
2673                }
2674                if er.is_some() && sr.is_none() {
2675                    // Open start: anchor at row 1
2676                    sr = Some(1);
2677                }
2678                if sc.is_some() && ec.is_none() {
2679                    let srv = sr.unwrap_or(1);
2680                    let erv = er.unwrap_or(srv);
2681                    if let Some((_, max_c)) = self.used_cols_for_rows(sheet_name, srv, erv) {
2682                        ec = Some(max_c);
2683                    } else if let Some((_, max_cols)) = self.sheet_bounds(sheet_name) {
2684                        ec = Some(max_cols);
2685                    }
2686                }
2687                if ec.is_some() && sc.is_none() {
2688                    // Open start: anchor at column 1
2689                    sc = Some(1);
2690                }
2691
2692                let sr = sr.unwrap_or(1);
2693                let sc = sc.unwrap_or(1);
2694                let er = er.unwrap_or(sr.saturating_sub(1));
2695                let ec = ec.unwrap_or(sc.saturating_sub(1));
2696
2697                // Feature: Arrow-only RangeView resolution (no Hybrid/GraphSlice) when overlay mirroring is enabled.
2698                if self.config.arrow_storage_enabled
2699                    && self.config.delta_overlay_enabled
2700                    && self.config.write_formula_overlay_enabled
2701                {
2702                    if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
2703                        let sr0 = sr.saturating_sub(1) as usize;
2704                        let sc0 = sc.saturating_sub(1) as usize;
2705                        let er0 = er.saturating_sub(1) as usize;
2706                        let ec0 = ec.saturating_sub(1) as usize;
2707                        let av = asheet.range_view(sr0, sc0, er0, ec0);
2708                        return Ok(RangeView::from_arrow(av));
2709                    }
2710                    // Materialize via graph when Arrow backend is absent.
2711                    let rows = (sr..=er)
2712                        .map(|r| {
2713                            (sc..=ec)
2714                                .map(|c| {
2715                                    let coord = Coord::new(r, c, true, true);
2716                                    let addr = CellRef::new(sheet_id, coord);
2717                                    self.graph
2718                                        .get_vertex_id_for_address(&addr)
2719                                        .and_then(|id| self.graph.get_value(*id))
2720                                        .unwrap_or(LiteralValue::Empty)
2721                                })
2722                                .collect::<Vec<_>>()
2723                        })
2724                        .collect::<Vec<_>>();
2725                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(rows))));
2726                }
2727
2728                // Default behavior (no feature): prefer Hybrid when Arrow exists, otherwise GraphSlice
2729                if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
2730                    let sr0 = sr.saturating_sub(1) as usize;
2731                    let sc0 = sc.saturating_sub(1) as usize;
2732                    let er0 = er.saturating_sub(1) as usize;
2733                    let ec0 = ec.saturating_sub(1) as usize;
2734                    let av = asheet.range_view(sr0, sc0, er0, ec0);
2735                    return Ok(RangeView::from_hybrid(&self.graph, sheet_id, sr, sc, av));
2736                }
2737                Ok(RangeView::from_graph(&self.graph, sheet_id, sr, sc, er, ec))
2738            }
2739            ReferenceType::Cell { sheet, row, col } => {
2740                let sheet_name = sheet.as_deref().unwrap_or(current_sheet);
2741                if let Some(sheet_id) = self.graph.sheet_id(sheet_name) {
2742                    let coord = Coord::new(*row, *col, true, true);
2743                    let addr = CellRef::new(sheet_id, coord);
2744                    if let Some(vid) = self.graph.get_vertex_id_for_address(&addr) {
2745                        if matches!(
2746                            self.graph.get_vertex_kind(*vid),
2747                            VertexKind::FormulaScalar | VertexKind::FormulaArray
2748                        ) {
2749                            if let Some(v) = self.graph.get_cell_value(sheet_name, *row, *col) {
2750                                return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![
2751                                    vec![v],
2752                                ]))));
2753                            }
2754                        }
2755                    }
2756                }
2757                if self.config.arrow_storage_enabled
2758                    && self.config.delta_overlay_enabled
2759                    && self.config.write_formula_overlay_enabled
2760                {
2761                    if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
2762                        let r0 = row.saturating_sub(1) as usize;
2763                        let c0 = col.saturating_sub(1) as usize;
2764                        let av = asheet.range_view(r0, c0, r0, c0);
2765                        let v = av.get_cell(0, 0);
2766                        return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
2767                    }
2768                    // No Arrow: use graph value if present
2769                    let v = self
2770                        .graph
2771                        .get_cell_value(sheet_name, *row, *col)
2772                        .unwrap_or(LiteralValue::Empty);
2773                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
2774                }
2775
2776                // Default behavior
2777                if let Some(v) = self.graph.get_cell_value(sheet_name, *row, *col) {
2778                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
2779                }
2780                if let Some(asheet) = self.sheet_store().sheet(sheet_name) {
2781                    let r0 = row.saturating_sub(1) as usize;
2782                    let c0 = col.saturating_sub(1) as usize;
2783                    let av = asheet.range_view(r0, c0, r0, c0);
2784                    let v = av.get_cell(0, 0);
2785                    return Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![v]]))));
2786                }
2787                Ok(RangeView::from_borrowed(Box::leak(Box::new(vec![vec![
2788                    LiteralValue::Empty,
2789                ]]))))
2790            }
2791            ReferenceType::NamedRange(name) => {
2792                let data = self.resolver.resolve_named_range_reference(name)?;
2793                Ok(RangeView::from_borrowed(Box::leak(Box::new(data))))
2794            }
2795            ReferenceType::Table(tref) => {
2796                // Materialize via Resolver::resolve_range_like tranche 1
2797                let boxed = self.resolve_range_like(&ReferenceType::Table(tref.clone()))?;
2798                {
2799                    let owned = boxed.materialise().into_owned();
2800                    Ok(RangeView::from_borrowed(Box::leak(Box::new(owned))))
2801                }
2802            }
2803        }
2804    }
2805
2806    fn build_criteria_mask(
2807        &self,
2808        view: &crate::arrow_store::ArrowRangeView<'_>,
2809        col_in_view: usize,
2810        pred: &crate::args::CriteriaPredicate,
2811    ) -> Option<std::sync::Arc<arrow_array::BooleanArray>> {
2812        if view.dims().0 == 0 || view.dims().1 == 0 {
2813            return None;
2814        }
2815        let snap = self.data_snapshot_id();
2816        let mut guard = self.mask_cache.write().ok()?;
2817        let cap = self.config.warmup.mask_cache_entries_cap;
2818        let cache = guard.get_or_insert_with(|| MaskCache::new(snap, cap));
2819        if cache.snapshot != snap {
2820            cache.clear_and_set_snapshot(snap);
2821        }
2822        cache.get_or_compute(self, view, col_in_view, pred)
2823    }
2824}
2825
2826impl<R> Engine<R>
2827where
2828    R: EvaluationContext,
2829{
2830    /// Helper: commit spill via shim and mirror resulting cells into Arrow overlay when enabled.
2831    fn commit_spill_and_mirror(
2832        &mut self,
2833        anchor_vertex: VertexId,
2834        targets: &[CellRef],
2835        rows: Vec<Vec<LiteralValue>>,
2836    ) -> Result<(), ExcelError> {
2837        // Commit via shim (releases locks)
2838        self.spill_mgr
2839            .commit_array(&mut self.graph, anchor_vertex, targets, rows.clone())?;
2840
2841        if self.config.arrow_storage_enabled
2842            && self.config.delta_overlay_enabled
2843            && self.config.write_formula_overlay_enabled
2844        {
2845            for (idx, cell) in targets.iter().enumerate() {
2846                if rows.is_empty() || rows[0].is_empty() {
2847                    break;
2848                }
2849                let width = rows[0].len();
2850                let r_off = idx / width;
2851                let c_off = idx % width;
2852                let v = rows[r_off][c_off].clone();
2853                let sheet_name = self.graph.sheet_name(cell.sheet_id).to_string();
2854                self.mirror_value_to_overlay(&sheet_name, cell.coord.row, cell.coord.col, &v);
2855            }
2856        }
2857        Ok(())
2858    }
2859}