formualizer_eval/engine/
ingest_builder.rs

1use crate::engine::graph::DependencyGraph;
2use formualizer_common::Coord as AbsCoord;
3// use crate::engine::plan::RangeKey; // no longer needed directly here
4use crate::engine::EvalConfig;
5use crate::{SheetId, engine::vertex::VertexId};
6use formualizer_common::{ExcelError, LiteralValue};
7use formualizer_parse::parser::{ASTNode, CollectPolicy};
8use rustc_hash::FxHashMap;
9
10/// Summary of bulk ingest
11#[derive(Debug, Clone)]
12pub struct BulkIngestSummary {
13    pub sheets: usize,
14    pub vertices: usize,
15    pub formulas: usize,
16    pub edges: usize,
17    pub elapsed: std::time::Duration,
18}
19
20struct SheetStage {
21    name: String,
22    id: SheetId,
23    values: Vec<(u32, u32, LiteralValue)>,
24    formulas: Vec<(u32, u32, ASTNode, bool)>, // volatile flag
25}
26
27impl SheetStage {
28    fn new(name: String, id: SheetId) -> Self {
29        Self {
30            name,
31            id,
32            values: Vec::new(),
33            formulas: Vec::new(),
34        }
35    }
36}
37
38pub struct BulkIngestBuilder<'g> {
39    g: &'g mut DependencyGraph,
40    sheets: FxHashMap<SheetId, SheetStage>,
41    cfg_saved: EvalConfig,
42    vols_buf: Vec<bool>,
43}
44
45impl<'g> BulkIngestBuilder<'g> {
46    pub fn new(g: &'g mut DependencyGraph) -> Self {
47        let cfg_saved = g.get_config().clone();
48        // Respect current sheet index mode (loader may set Lazy to skip index work during ingest)
49        Self {
50            g,
51            sheets: FxHashMap::default(),
52            cfg_saved,
53            vols_buf: Vec::new(),
54        }
55    }
56
57    pub fn add_sheet(&mut self, name: &str) -> SheetId {
58        let id = self.g.sheet_id_mut(name);
59        self.sheets
60            .entry(id)
61            .or_insert_with(|| SheetStage::new(name.to_string(), id));
62        id
63    }
64
65    pub fn add_values<I>(&mut self, sheet: SheetId, values: I)
66    where
67        I: IntoIterator<Item = (u32, u32, LiteralValue)>,
68    {
69        let stage = self
70            .sheets
71            .entry(sheet)
72            .or_insert_with(|| SheetStage::new(self.g.sheet_name(sheet).to_string(), sheet));
73        stage.values.extend(values);
74    }
75
76    pub fn add_formulas<I>(&mut self, sheet: SheetId, formulas: I)
77    where
78        I: IntoIterator<Item = (u32, u32, ASTNode)>,
79    {
80        let stage = self
81            .sheets
82            .entry(sheet)
83            .or_insert_with(|| SheetStage::new(self.g.sheet_name(sheet).to_string(), sheet));
84        for (r, c, ast) in formulas {
85            let vol = Self::is_ast_volatile(&ast);
86            stage.formulas.push((r, c, ast, vol));
87        }
88    }
89
90    fn is_ast_volatile(ast: &ASTNode) -> bool {
91        use formualizer_parse::parser::ASTNodeType;
92
93        if ast.contains_volatile() {
94            return true;
95        }
96
97        match &ast.node_type {
98            ASTNodeType::Function { name, args } => {
99                if let Some(func) = crate::function_registry::get("", name)
100                    && func.caps().contains(crate::function::FnCaps::VOLATILE)
101                {
102                    return true;
103                }
104                args.iter().any(Self::is_ast_volatile)
105            }
106            ASTNodeType::BinaryOp { left, right, .. } => {
107                Self::is_ast_volatile(left) || Self::is_ast_volatile(right)
108            }
109            ASTNodeType::UnaryOp { expr, .. } => Self::is_ast_volatile(expr),
110            ASTNodeType::Array(rows) => {
111                rows.iter().any(|row| row.iter().any(Self::is_ast_volatile))
112            }
113            _ => false,
114        }
115    }
116
117    pub fn finish(mut self) -> Result<BulkIngestSummary, ExcelError> {
118        use web_time::Instant;
119        let t0 = Instant::now();
120        let dbg = std::env::var("FZ_DEBUG_INGEST")
121            .ok()
122            .is_some_and(|v| v != "0")
123            || std::env::var("FZ_DEBUG_LOAD")
124                .ok()
125                .is_some_and(|v| v != "0");
126        let mut total_vertices = 0usize;
127        let mut total_formulas = 0usize;
128        let mut total_edges = 0usize;
129
130        if dbg {
131            eprintln!(
132                "[fz][ingest] starting bulk ingest with {} sheets",
133                self.sheets.len()
134            );
135        }
136
137        // Materialize per-sheet to keep caches warm and reduce cross-sheet churn
138        // Accumulate a flat adjacency for a single-shot CSR build
139        let mut edges_adj: Vec<(u32, Vec<u32>)> = Vec::new();
140        let mut coord_accum: Vec<AbsCoord> = Vec::new();
141        let mut id_accum: Vec<u32> = Vec::new();
142        for (_sid, stage) in self.sheets.drain() {
143            let t_sheet0 = Instant::now();
144            let mut t_values_ms = 0u128;
145            let mut t_plan_ms = 0u128;
146            let mut t_ensure_ms = 0u128;
147            let mut t_assign_ms = 0u128;
148            let mut t_edges_ms = 0u128;
149            let mut t_ranges_ms = 0u128;
150            let mut n_targets = 0usize;
151            let mut n_globals = 0usize;
152            let mut n_cell_deps = 0usize;
153            let mut n_range_deps = 0usize;
154            if dbg {
155                eprintln!("[fz][ingest] sheet '{}' begin", stage.name);
156            }
157            // 1) Ensure/allocate values quickly (reuse existing bulk path)
158            if !stage.values.is_empty() {
159                let tv0 = Instant::now();
160
161                self.g
162                    .bulk_insert_values(&stage.name, stage.values.into_iter());
163                t_values_ms = tv0.elapsed().as_millis();
164            }
165
166            // 2) Build plan for formulas on this sheet
167            if !stage.formulas.is_empty() {
168                let tp0 = Instant::now();
169                let refs = stage
170                    .formulas
171                    .iter()
172                    .map(|(r, c, ast, _)| (stage.name.as_str(), *r, *c, ast));
173                // Reuse volatility buffer
174                self.vols_buf.clear();
175                self.vols_buf.reserve(stage.formulas.len());
176                for &(_, _, _, v) in &stage.formulas {
177                    self.vols_buf.push(v);
178                }
179                let policy = CollectPolicy {
180                    expand_small_ranges: true,
181                    range_expansion_limit: self.g.range_expansion_limit(),
182                    include_names: true,
183                };
184                let plan = self
185                    .g
186                    .plan_dependencies(refs, &policy, Some(&self.vols_buf))?;
187                // Reserve adjacency rows capacity upfront for this sheet
188                edges_adj.reserve(plan.formula_targets.len());
189                t_plan_ms = tp0.elapsed().as_millis();
190                n_targets = plan.formula_targets.len();
191                n_globals = plan.global_cells.len();
192
193                // 3) Ensure targets and referenced cells exist using batch allocation when missing
194                // Union of targets and global_cells (dedup to cut redundant lookups)
195                let mut all_coords: Vec<(SheetId, AbsCoord)> =
196                    Vec::with_capacity(plan.formula_targets.len() + plan.global_cells.len());
197                all_coords.extend(plan.formula_targets.iter().cloned());
198                all_coords.extend(plan.global_cells.iter().cloned());
199                // Deduplicate by (SheetId, AbsCoord)
200                let mut seen: rustc_hash::FxHashSet<(SheetId, AbsCoord)> =
201                    rustc_hash::FxHashSet::default();
202                all_coords.retain(|tpl| seen.insert(*tpl));
203
204                // Ensure vertices in batch and also track coords/ids for CSR rebuild
205                let te0 = Instant::now();
206                let add_batch = self.g.ensure_vertices_batch(&all_coords);
207                total_vertices += add_batch.len();
208                if !add_batch.is_empty() {
209                    for (pc, id) in &add_batch {
210                        coord_accum.push(*pc);
211                        id_accum.push(*id);
212                    }
213                }
214                t_ensure_ms = te0.elapsed().as_millis();
215
216                // 4) Store ASTs and set kinds/dirty/volatile; map targets to vids
217                let ta0 = Instant::now();
218                let ast_ids = self
219                    .g
220                    .store_asts_batch(stage.formulas.iter().map(|(_, _, ast, _)| ast));
221
222                let mut target_vids: Vec<VertexId> = Vec::with_capacity(plan.formula_targets.len());
223                for (i, (sid, pc)) in plan.formula_targets.iter().enumerate() {
224                    let vid = self.g.vid_for_sid_pc(*sid, *pc).expect("VID must exist");
225                    target_vids.push(vid);
226                    // Remove old edges if replacing a formula
227                    self.g
228                        .assign_formula_vertex(vid, ast_ids[i], stage.formulas[i].3);
229                }
230                total_formulas += target_vids.len();
231                t_assign_ms = ta0.elapsed().as_millis();
232
233                // 5) Collect edges into adjacency rows for a later one-shot CSR build
234                let ted0 = Instant::now();
235                for (fi, &tvid) in target_vids.iter().enumerate() {
236                    // Use SmallVec to avoid heap allocs for small dependency counts
237                    let mut row: smallvec::SmallVec<[u32; 8]> = smallvec::SmallVec::new();
238                    if let Some(indices) = plan.per_formula_cells.get(fi) {
239                        let mut dep_count = 0usize;
240                        row.reserve(indices.len());
241                        for &idx in indices {
242                            if let Some(dep_vid) = self.g.vid_for_plan_idx(&plan, idx) {
243                                row.push(dep_vid.0);
244                                dep_count += 1;
245                            }
246                        }
247                        total_edges += dep_count;
248                        n_cell_deps += dep_count;
249                    }
250
251                    // Range deps via direct RangeKey path
252                    let tr0 = Instant::now();
253                    if let Some(rks) = plan.per_formula_ranges.get(fi) {
254                        n_range_deps += rks.len();
255                        self.g.add_range_deps_from_keys(tvid, rks, stage.id);
256                    }
257                    t_ranges_ms += tr0.elapsed().as_millis();
258                    if let Some(names) = plan.per_formula_names.get(fi)
259                        && !names.is_empty()
260                    {
261                        let mut name_vertices = Vec::new();
262                        let (formula_sheet, _) = plan
263                            .formula_targets
264                            .get(fi)
265                            .copied()
266                            .unwrap_or((stage.id, AbsCoord::new(1, 1)));
267                        for name in names {
268                            if let Some(named) = self.g.resolve_name_entry(name, formula_sheet) {
269                                row.push(named.vertex.0);
270                                name_vertices.push(named.vertex);
271                            } else {
272                                self.g
273                                    .record_pending_name_reference(formula_sheet, name, tvid);
274                            }
275                        }
276                        if !name_vertices.is_empty() {
277                            self.g.attach_vertex_to_names(tvid, &name_vertices);
278                        }
279                    }
280                    // Always add adjacency row for target (may be empty)
281                    edges_adj.push((tvid.0, row.into_vec()));
282                }
283                t_edges_ms = ted0.elapsed().as_millis();
284            }
285            if dbg {
286                eprintln!(
287                    "[fz][ingest] sheet '{}' done: values={}ms plan={}ms ensure={}ms assign={}ms edges={}ms ranges={}ms targets={} globals={} cell_deps={} range_groups={} total={}ms",
288                    stage.name,
289                    t_values_ms,
290                    t_plan_ms,
291                    t_ensure_ms,
292                    t_assign_ms,
293                    t_edges_ms,
294                    t_ranges_ms,
295                    n_targets,
296                    n_globals,
297                    n_cell_deps,
298                    n_range_deps,
299                    t_sheet0.elapsed().as_millis()
300                );
301            }
302        }
303        if dbg {
304            eprintln!("[fz][ingest] beginning finalize");
305        }
306
307        // Finalize: pick strategy based on graph size and number of edge rows
308        if !edges_adj.is_empty() {
309            let rows = edges_adj.len();
310            let total_vertices_now = self.g.vertex_count();
311            let t_fin0 = Instant::now();
312            if dbg {
313                eprintln!(
314                    "[fz][ingest] finalize: start rows={rows}, vertices={total_vertices_now}"
315                );
316            }
317            // Heuristic: avoid one-shot CSR when vertices are huge and rows are sparse
318            let sparse_vs_huge =
319                total_vertices_now > 800_000 && (rows as f64) / (total_vertices_now as f64) < 0.05;
320            if sparse_vs_huge {
321                let t_delta0 = Instant::now();
322                if dbg {
323                    eprintln!("[fz][ingest] finalize: using delta path (begin)");
324                }
325                self.g.begin_batch();
326                for (tvid_raw, row) in &edges_adj {
327                    let tvid = crate::engine::vertex::VertexId(*tvid_raw);
328                    if !row.is_empty() {
329                        let deps: Vec<crate::engine::vertex::VertexId> = row
330                            .iter()
331                            .map(|d| crate::engine::vertex::VertexId(*d))
332                            .collect();
333                        self.g.add_edges_nobatch(tvid, &deps);
334                    }
335                }
336                self.g.end_batch();
337                if dbg {
338                    eprintln!(
339                        "[fz][ingest] finalize: delta done in {} ms (total {} ms)",
340                        t_delta0.elapsed().as_millis(),
341                        t_fin0.elapsed().as_millis()
342                    );
343                }
344            } else {
345                // One-shot CSR build from accumulated adjacency and coords/ids
346                let mut t_coords_ms = 0u128;
347                if coord_accum.is_empty() || id_accum.is_empty() {
348                    if dbg {
349                        eprintln!("[fz][ingest] finalize: gathering coords/ids");
350                    }
351                    let t_coords0 = Instant::now();
352                    for vid in self.g.iter_vertex_ids() {
353                        coord_accum.push(self.g.vertex_coord(vid));
354                        id_accum.push(vid.0);
355                    }
356                    t_coords_ms = t_coords0.elapsed().as_millis();
357                }
358                if dbg {
359                    eprintln!("[fz][ingest] finalize: building CSR");
360                }
361                let t_csr0 = Instant::now();
362                self.g
363                    .build_edges_from_adjacency(edges_adj, coord_accum, id_accum);
364                if dbg {
365                    eprintln!(
366                        "[fz][ingest] finalize: rows={}, gather_coords={} ms, csr_build={} ms, total={} ms",
367                        rows,
368                        t_coords_ms,
369                        t_csr0.elapsed().as_millis(),
370                        t_fin0.elapsed().as_millis()
371                    );
372                }
373            }
374        }
375
376        // Restore config
377        self.g.set_sheet_index_mode(self.cfg_saved.sheet_index_mode);
378        Ok(BulkIngestSummary {
379            sheets: 0, // could populate later
380            vertices: total_vertices,
381            formulas: total_formulas,
382            edges: total_edges,
383            elapsed: t0.elapsed(),
384        })
385    }
386}