formualizer_eval/engine/
ingest_builder.rs

1use crate::engine::graph::DependencyGraph;
2use crate::engine::packed_coord::PackedCoord;
3// use crate::engine::plan::RangeKey; // no longer needed directly here
4use crate::engine::EvalConfig;
5use crate::{SheetId, engine::vertex::VertexId};
6use formualizer_common::{ExcelError, LiteralValue};
7use formualizer_parse::parser::{ASTNode, CollectPolicy};
8use rustc_hash::FxHashMap;
9
10/// Summary of bulk ingest
11#[derive(Debug, Clone)]
12pub struct BulkIngestSummary {
13    pub sheets: usize,
14    pub vertices: usize,
15    pub formulas: usize,
16    pub edges: usize,
17    pub elapsed: std::time::Duration,
18}
19
20struct SheetStage {
21    name: String,
22    id: SheetId,
23    values: Vec<(u32, u32, LiteralValue)>,
24    formulas: Vec<(u32, u32, ASTNode, bool)>, // volatile flag
25}
26
27impl SheetStage {
28    fn new(name: String, id: SheetId) -> Self {
29        Self {
30            name,
31            id,
32            values: Vec::new(),
33            formulas: Vec::new(),
34        }
35    }
36}
37
38pub struct BulkIngestBuilder<'g> {
39    g: &'g mut DependencyGraph,
40    sheets: FxHashMap<SheetId, SheetStage>,
41    cfg_saved: EvalConfig,
42    vols_buf: Vec<bool>,
43}
44
45impl<'g> BulkIngestBuilder<'g> {
46    pub fn new(g: &'g mut DependencyGraph) -> Self {
47        let cfg_saved = g.get_config().clone();
48        // Respect current sheet index mode (loader may set Lazy to skip index work during ingest)
49        Self {
50            g,
51            sheets: FxHashMap::default(),
52            cfg_saved,
53            vols_buf: Vec::new(),
54        }
55    }
56
57    pub fn add_sheet(&mut self, name: &str) -> SheetId {
58        let id = self.g.sheet_id_mut(name);
59        self.sheets
60            .entry(id)
61            .or_insert_with(|| SheetStage::new(name.to_string(), id));
62        id
63    }
64
65    pub fn add_values<I>(&mut self, sheet: SheetId, values: I)
66    where
67        I: IntoIterator<Item = (u32, u32, LiteralValue)>,
68    {
69        let stage = self
70            .sheets
71            .entry(sheet)
72            .or_insert_with(|| SheetStage::new(self.g.sheet_name(sheet).to_string(), sheet));
73        stage.values.extend(values);
74    }
75
76    pub fn add_formulas<I>(&mut self, sheet: SheetId, formulas: I)
77    where
78        I: IntoIterator<Item = (u32, u32, ASTNode)>,
79    {
80        let stage = self
81            .sheets
82            .entry(sheet)
83            .or_insert_with(|| SheetStage::new(self.g.sheet_name(sheet).to_string(), sheet));
84        for (r, c, ast) in formulas {
85            let vol = ast.contains_volatile();
86            stage.formulas.push((r, c, ast, vol));
87        }
88    }
89
90    pub fn finish(mut self) -> Result<BulkIngestSummary, ExcelError> {
91        use std::time::Instant;
92        let t0 = Instant::now();
93        let dbg = std::env::var("FZ_DEBUG_INGEST")
94            .ok()
95            .is_some_and(|v| v != "0")
96            || std::env::var("FZ_DEBUG_LOAD")
97                .ok()
98                .is_some_and(|v| v != "0");
99        let mut total_vertices = 0usize;
100        let mut total_formulas = 0usize;
101        let mut total_edges = 0usize;
102
103        if dbg {
104            eprintln!(
105                "[fz][ingest] starting bulk ingest with {} sheets",
106                self.sheets.len()
107            );
108        }
109
110        // Materialize per-sheet to keep caches warm and reduce cross-sheet churn
111        // Accumulate a flat adjacency for a single-shot CSR build
112        let mut edges_adj: Vec<(u32, Vec<u32>)> = Vec::new();
113        let mut coord_accum: Vec<PackedCoord> = Vec::new();
114        let mut id_accum: Vec<u32> = Vec::new();
115        for (_sid, stage) in self.sheets.drain() {
116            let t_sheet0 = Instant::now();
117            let mut t_values_ms = 0u128;
118            let mut t_plan_ms = 0u128;
119            let mut t_ensure_ms = 0u128;
120            let mut t_assign_ms = 0u128;
121            let mut t_edges_ms = 0u128;
122            let mut t_ranges_ms = 0u128;
123            let mut n_targets = 0usize;
124            let mut n_globals = 0usize;
125            let mut n_cell_deps = 0usize;
126            let mut n_range_deps = 0usize;
127            if dbg {
128                eprintln!("[fz][ingest] sheet '{}' begin", stage.name);
129            }
130            // 1) Ensure/allocate values quickly (reuse existing bulk path)
131            if !stage.values.is_empty() {
132                let tv0 = Instant::now();
133
134                self.g
135                    .bulk_insert_values(&stage.name, stage.values.into_iter());
136                t_values_ms = tv0.elapsed().as_millis();
137            }
138
139            // 2) Build plan for formulas on this sheet
140            if !stage.formulas.is_empty() {
141                let tp0 = Instant::now();
142                let refs = stage
143                    .formulas
144                    .iter()
145                    .map(|(r, c, ast, _)| (stage.name.as_str(), *r, *c, ast));
146                // Reuse volatility buffer
147                self.vols_buf.clear();
148                self.vols_buf.reserve(stage.formulas.len());
149                for &(_, _, _, v) in &stage.formulas {
150                    self.vols_buf.push(v);
151                }
152                let policy = CollectPolicy {
153                    expand_small_ranges: true,
154                    range_expansion_limit: self.g.range_expansion_limit(),
155                    include_names: true,
156                };
157                let plan = self
158                    .g
159                    .plan_dependencies(refs, &policy, Some(&self.vols_buf))?;
160                // Reserve adjacency rows capacity upfront for this sheet
161                edges_adj.reserve(plan.formula_targets.len());
162                t_plan_ms = tp0.elapsed().as_millis();
163                n_targets = plan.formula_targets.len();
164                n_globals = plan.global_cells.len();
165
166                // 3) Ensure targets and referenced cells exist using batch allocation when missing
167                // Union of targets and global_cells (dedup to cut redundant lookups)
168                let mut all_coords: Vec<(SheetId, PackedCoord)> =
169                    Vec::with_capacity(plan.formula_targets.len() + plan.global_cells.len());
170                all_coords.extend(plan.formula_targets.iter().cloned());
171                all_coords.extend(plan.global_cells.iter().cloned());
172                // Deduplicate by (SheetId, PackedCoord)
173                let mut seen: rustc_hash::FxHashSet<(SheetId, PackedCoord)> =
174                    rustc_hash::FxHashSet::default();
175                all_coords.retain(|tpl| seen.insert(*tpl));
176
177                // Ensure vertices in batch and also track coords/ids for CSR rebuild
178                let te0 = Instant::now();
179                let add_batch = self.g.ensure_vertices_batch(&all_coords);
180                total_vertices += add_batch.len();
181                if !add_batch.is_empty() {
182                    for (pc, id) in &add_batch {
183                        coord_accum.push(*pc);
184                        id_accum.push(*id);
185                    }
186                }
187                t_ensure_ms = te0.elapsed().as_millis();
188
189                // 4) Store ASTs and set kinds/dirty/volatile; map targets to vids
190                let ta0 = Instant::now();
191                let ast_ids = self
192                    .g
193                    .store_asts_batch(stage.formulas.iter().map(|(_, _, ast, _)| ast));
194
195                let mut target_vids: Vec<VertexId> = Vec::with_capacity(plan.formula_targets.len());
196                for (i, (sid, pc)) in plan.formula_targets.iter().enumerate() {
197                    let vid = self.g.vid_for_sid_pc(*sid, *pc).expect("VID must exist");
198                    target_vids.push(vid);
199                    // Remove old edges if replacing a formula
200                    self.g
201                        .assign_formula_vertex(vid, ast_ids[i], stage.formulas[i].3);
202                }
203                total_formulas += target_vids.len();
204                t_assign_ms = ta0.elapsed().as_millis();
205
206                // 5) Collect edges into adjacency rows for a later one-shot CSR build
207                let ted0 = Instant::now();
208                for (fi, &tvid) in target_vids.iter().enumerate() {
209                    // Use SmallVec to avoid heap allocs for small dependency counts
210                    let mut row: smallvec::SmallVec<[u32; 8]> = smallvec::SmallVec::new();
211                    if let Some(indices) = plan.per_formula_cells.get(fi) {
212                        let mut dep_count = 0usize;
213                        row.reserve(indices.len());
214                        for &idx in indices {
215                            if let Some(dep_vid) = self.g.vid_for_plan_idx(&plan, idx) {
216                                row.push(dep_vid.0);
217                                dep_count += 1;
218                            }
219                        }
220                        total_edges += dep_count;
221                        n_cell_deps += dep_count;
222                    }
223
224                    // Range deps via direct RangeKey path
225                    let tr0 = Instant::now();
226                    if let Some(rks) = plan.per_formula_ranges.get(fi) {
227                        n_range_deps += rks.len();
228                        self.g.add_range_deps_from_keys(tvid, rks, stage.id);
229                    }
230                    t_ranges_ms += tr0.elapsed().as_millis();
231                    // Always add adjacency row for target (may be empty)
232                    edges_adj.push((tvid.0, row.into_vec()));
233                }
234                t_edges_ms = ted0.elapsed().as_millis();
235            }
236            if dbg {
237                eprintln!(
238                    "[fz][ingest] sheet '{}' done: values={}ms plan={}ms ensure={}ms assign={}ms edges={}ms ranges={}ms targets={} globals={} cell_deps={} range_groups={} total={}ms",
239                    stage.name,
240                    t_values_ms,
241                    t_plan_ms,
242                    t_ensure_ms,
243                    t_assign_ms,
244                    t_edges_ms,
245                    t_ranges_ms,
246                    n_targets,
247                    n_globals,
248                    n_cell_deps,
249                    n_range_deps,
250                    t_sheet0.elapsed().as_millis()
251                );
252            }
253        }
254        if dbg {
255            eprintln!("[fz][ingest] beginning finalize");
256        }
257
258        // Finalize: pick strategy based on graph size and number of edge rows
259        if !edges_adj.is_empty() {
260            let rows = edges_adj.len();
261            let total_vertices_now = self.g.vertex_count();
262            let t_fin0 = Instant::now();
263            if dbg {
264                eprintln!(
265                    "[fz][ingest] finalize: start rows={rows}, vertices={total_vertices_now}"
266                );
267            }
268            // Heuristic: avoid one-shot CSR when vertices are huge and rows are sparse
269            let sparse_vs_huge =
270                total_vertices_now > 800_000 && (rows as f64) / (total_vertices_now as f64) < 0.05;
271            if sparse_vs_huge {
272                let t_delta0 = Instant::now();
273                if dbg {
274                    eprintln!("[fz][ingest] finalize: using delta path (begin)");
275                }
276                self.g.begin_batch();
277                for (tvid_raw, row) in &edges_adj {
278                    let tvid = crate::engine::vertex::VertexId(*tvid_raw);
279                    if !row.is_empty() {
280                        let deps: Vec<crate::engine::vertex::VertexId> = row
281                            .iter()
282                            .map(|d| crate::engine::vertex::VertexId(*d))
283                            .collect();
284                        self.g.add_edges_nobatch(tvid, &deps);
285                    }
286                }
287                self.g.end_batch();
288                if dbg {
289                    eprintln!(
290                        "[fz][ingest] finalize: delta done in {} ms (total {} ms)",
291                        t_delta0.elapsed().as_millis(),
292                        t_fin0.elapsed().as_millis()
293                    );
294                }
295            } else {
296                // One-shot CSR build from accumulated adjacency and coords/ids
297                let mut t_coords_ms = 0u128;
298                if coord_accum.is_empty() || id_accum.is_empty() {
299                    if dbg {
300                        eprintln!("[fz][ingest] finalize: gathering coords/ids");
301                    }
302                    let t_coords0 = Instant::now();
303                    for vid in self.g.iter_vertex_ids() {
304                        coord_accum.push(self.g.vertex_coord(vid));
305                        id_accum.push(vid.0);
306                    }
307                    t_coords_ms = t_coords0.elapsed().as_millis();
308                }
309                if dbg {
310                    eprintln!("[fz][ingest] finalize: building CSR");
311                }
312                let t_csr0 = Instant::now();
313                self.g
314                    .build_edges_from_adjacency(edges_adj, coord_accum, id_accum);
315                if dbg {
316                    eprintln!(
317                        "[fz][ingest] finalize: rows={}, gather_coords={} ms, csr_build={} ms, total={} ms",
318                        rows,
319                        t_coords_ms,
320                        t_csr0.elapsed().as_millis(),
321                        t_fin0.elapsed().as_millis()
322                    );
323                }
324            }
325        }
326
327        // Restore config
328        self.g.set_sheet_index_mode(self.cfg_saved.sheet_index_mode);
329        Ok(BulkIngestSummary {
330            sheets: 0, // could populate later
331            vertices: total_vertices,
332            formulas: total_formulas,
333            edges: total_edges,
334            elapsed: t0.elapsed(),
335        })
336    }
337}