use crate::engine::graph::DependencyGraph;
use formualizer_common::Coord as AbsCoord;
use crate::engine::EvalConfig;
use crate::{SheetId, engine::vertex::VertexId};
use formualizer_common::ExcelError;
use formualizer_parse::parser::{ASTNode, CollectPolicy};
use rustc_hash::FxHashMap;
#[derive(Debug, Clone)]
pub struct BulkIngestSummary {
pub sheets: usize,
pub vertices: usize,
pub formulas: usize,
pub edges: usize,
pub elapsed: std::time::Duration,
}
struct SheetStage {
name: String,
id: SheetId,
formulas: Vec<(u32, u32, ASTNode, bool)>, }
impl SheetStage {
fn new(name: String, id: SheetId) -> Self {
Self {
name,
id,
formulas: Vec::new(),
}
}
}
pub struct BulkIngestBuilder<'g> {
g: &'g mut DependencyGraph,
sheets: FxHashMap<SheetId, SheetStage>,
cfg_saved: EvalConfig,
vols_buf: Vec<bool>,
}
impl<'g> BulkIngestBuilder<'g> {
pub fn new(g: &'g mut DependencyGraph) -> Self {
let cfg_saved = g.get_config().clone();
Self {
g,
sheets: FxHashMap::default(),
cfg_saved,
vols_buf: Vec::new(),
}
}
pub fn add_sheet(&mut self, name: &str) -> SheetId {
let id = self.g.sheet_id(name).unwrap_or_else(|| {
panic!(
"BulkIngestBuilder::add_sheet requires pre-existing sheet; call Engine::add_sheet first: {name}"
)
});
self.sheets
.entry(id)
.or_insert_with(|| SheetStage::new(name.to_string(), id));
id
}
pub fn add_formulas<I>(&mut self, sheet: SheetId, formulas: I)
where
I: IntoIterator<Item = (u32, u32, ASTNode)>,
{
let stage = self
.sheets
.entry(sheet)
.or_insert_with(|| SheetStage::new(self.g.sheet_name(sheet).to_string(), sheet));
for (r, c, ast) in formulas {
let vol = Self::is_ast_volatile(&ast);
stage.formulas.push((r, c, ast, vol));
}
}
fn is_ast_volatile(ast: &ASTNode) -> bool {
use formualizer_parse::parser::ASTNodeType;
if ast.contains_volatile() {
return true;
}
match &ast.node_type {
ASTNodeType::Function { name, args } => {
if let Some(func) = crate::function_registry::get("", name)
&& func.caps().contains(crate::function::FnCaps::VOLATILE)
{
return true;
}
args.iter().any(Self::is_ast_volatile)
}
ASTNodeType::BinaryOp { left, right, .. } => {
Self::is_ast_volatile(left) || Self::is_ast_volatile(right)
}
ASTNodeType::UnaryOp { expr, .. } => Self::is_ast_volatile(expr),
ASTNodeType::Array(rows) => {
rows.iter().any(|row| row.iter().any(Self::is_ast_volatile))
}
_ => false,
}
}
pub fn finish(mut self) -> Result<BulkIngestSummary, ExcelError> {
use crate::instant::FzInstant as Instant;
let t0 = Instant::now();
let dbg = std::env::var("FZ_DEBUG_INGEST")
.ok()
.is_some_and(|v| v != "0")
|| std::env::var("FZ_DEBUG_LOAD")
.ok()
.is_some_and(|v| v != "0");
let mut total_vertices = 0usize;
let mut total_formulas = 0usize;
let mut total_edges = 0usize;
if dbg {
eprintln!(
"[fz][ingest] starting bulk ingest with {} sheets",
self.sheets.len()
);
}
let mut edges_adj: Vec<(u32, Vec<u32>)> = Vec::new();
let mut coord_accum: Vec<AbsCoord> = Vec::new();
let mut id_accum: Vec<u32> = Vec::new();
for (_sid, mut stage) in self.sheets.drain() {
let t_sheet0 = Instant::now();
let mut t_plan_ms = 0u128;
let mut t_ensure_ms = 0u128;
let mut t_assign_ms = 0u128;
let mut t_edges_ms = 0u128;
let mut t_ranges_ms = 0u128;
let mut n_targets = 0usize;
let mut n_globals = 0usize;
let mut n_cell_deps = 0usize;
let mut n_range_deps = 0usize;
if dbg {
eprintln!("[fz][ingest] sheet '{}' begin", stage.name);
}
if !stage.formulas.is_empty() {
let formula_batch_size: usize = std::env::var("FZ_INGEST_FORMULA_BATCH")
.ok()
.and_then(|s| s.parse().ok())
.filter(|&n| n > 0)
.unwrap_or(10_000);
let mut batch_count = 0usize;
for chunk in stage.formulas.chunks_mut(formula_batch_size) {
batch_count += 1;
for (r, c, ast, _vol) in chunk.iter_mut() {
let coord = crate::reference::Coord::from_excel(*r, *c, true, true);
let cell = crate::reference::CellRef::new(stage.id, coord);
self.g.rewrite_structured_references_for_cell(ast, cell)?;
}
let tp0 = Instant::now();
let refs = chunk
.iter()
.map(|(r, c, ast, _)| (stage.name.as_str(), *r, *c, ast));
self.vols_buf.clear();
self.vols_buf.reserve(chunk.len());
for &(_, _, _, v) in chunk.iter() {
self.vols_buf.push(v);
}
let policy = CollectPolicy {
expand_small_ranges: true,
range_expansion_limit: self.g.range_expansion_limit(),
include_names: true,
};
let plan = self
.g
.plan_dependencies(refs, &policy, Some(&self.vols_buf))?;
edges_adj.reserve(plan.formula_targets.len());
t_plan_ms += tp0.elapsed().as_millis();
n_targets += plan.formula_targets.len();
n_globals += plan.global_cells.len();
self.g.reserve_cells(plan.vertex_pool.len());
let te0 = Instant::now();
let (all_vids, add_batch) = self
.g
.ensure_vertices_batch_packed_ordered(&plan.vertex_pool_packed);
total_vertices += add_batch.len();
if !add_batch.is_empty() {
for (pc, id) in &add_batch {
coord_accum.push(*pc);
id_accum.push(*id);
}
}
t_ensure_ms += te0.elapsed().as_millis();
let ta0 = Instant::now();
self.g.reserve_formula_metadata(plan.formula_targets.len());
let ast_ids = self
.g
.store_asts_batch(chunk.iter().map(|(_, _, ast, _)| ast));
let mut dep_vids: Vec<VertexId> = Vec::with_capacity(plan.global_cells.len());
for &pos in &plan.global_cell_pool_indices {
dep_vids.push(all_vids[pos as usize]);
}
let mut target_vids: Vec<VertexId> =
Vec::with_capacity(plan.formula_targets.len());
let load_fast = self.g.first_load_assume_new();
for (i, &pos) in plan.formula_target_pool_indices.iter().enumerate() {
let vid = all_vids[pos as usize];
target_vids.push(vid);
let ast_ref = &chunk[i].2;
let dynamic = self.g.is_ast_dynamic(ast_ref);
if load_fast {
self.g.assign_formula_vertex_load_fast(
vid, ast_ids[i], chunk[i].3, dynamic,
);
} else {
self.g
.assign_formula_vertex(vid, ast_ids[i], chunk[i].3, dynamic);
}
}
self.g.mark_vertices_dirty_batch(&target_vids);
total_formulas += target_vids.len();
t_assign_ms += ta0.elapsed().as_millis();
let ted0 = Instant::now();
for (fi, &tvid) in target_vids.iter().enumerate() {
let mut row: smallvec::SmallVec<[u32; 8]> = smallvec::SmallVec::new();
if let Some(indices) = plan.per_formula_cells.get(fi) {
let mut dep_count = 0usize;
row.reserve(indices.len());
for &idx in indices {
let dep_vid = dep_vids[idx as usize];
row.push(dep_vid.0);
dep_count += 1;
}
total_edges += dep_count;
n_cell_deps += dep_count;
}
let tr0 = Instant::now();
if let Some(rks) = plan.per_formula_ranges.get(fi) {
n_range_deps += rks.len();
self.g.add_range_deps_from_keys(tvid, rks, stage.id);
}
t_ranges_ms += tr0.elapsed().as_millis();
if let Some(names) = plan.per_formula_names.get(fi)
&& !names.is_empty()
{
let mut name_vertices = Vec::new();
let (formula_sheet, _) = plan
.formula_targets
.get(fi)
.copied()
.unwrap_or((stage.id, AbsCoord::new(1, 1)));
for name in names {
if let Some(named) = self.g.resolve_name_entry(name, formula_sheet)
{
row.push(named.vertex.0);
name_vertices.push(named.vertex);
} else if let Some(source) =
self.g.resolve_source_scalar_entry(name)
{
row.push(source.vertex.0);
} else {
self.g
.record_pending_name_reference(formula_sheet, name, tvid);
}
}
if !name_vertices.is_empty() {
self.g.attach_vertex_to_names(tvid, &name_vertices);
}
}
if let Some(tables) = plan.per_formula_tables.get(fi)
&& !tables.is_empty()
{
for table_name in tables {
if let Some(table) = self.g.resolve_table_entry(table_name) {
row.push(table.vertex.0);
} else if let Some(source) =
self.g.resolve_source_table_entry(table_name)
{
row.push(source.vertex.0);
}
}
}
edges_adj.push((tvid.0, row.into_vec()));
}
t_edges_ms += ted0.elapsed().as_millis();
}
if dbg && batch_count > 1 {
eprintln!(
"[fz][ingest] sheet '{}' processed in {} formula batches (batch_size={})",
stage.name, batch_count, formula_batch_size
);
}
}
if dbg {
eprintln!(
"[fz][ingest] sheet '{}' done: plan={}ms ensure={}ms assign={}ms edges={}ms ranges={}ms targets={} globals={} cell_deps={} range_groups={} total={}ms",
stage.name,
t_plan_ms,
t_ensure_ms,
t_assign_ms,
t_edges_ms,
t_ranges_ms,
n_targets,
n_globals,
n_cell_deps,
n_range_deps,
t_sheet0.elapsed().as_millis()
);
}
}
if dbg {
eprintln!("[fz][ingest] beginning finalize");
}
if !edges_adj.is_empty() {
let rows = edges_adj.len();
let total_vertices_now = self.g.vertex_count();
let t_fin0 = Instant::now();
if dbg {
eprintln!(
"[fz][ingest] finalize: start rows={rows}, vertices={total_vertices_now}"
);
}
let sparse_vs_huge =
total_vertices_now > 800_000 && (rows as f64) / (total_vertices_now as f64) < 0.05;
if sparse_vs_huge {
let t_delta0 = Instant::now();
if dbg {
eprintln!("[fz][ingest] finalize: using delta path (begin)");
}
self.g.begin_batch();
for (tvid_raw, row) in &edges_adj {
let tvid = crate::engine::vertex::VertexId(*tvid_raw);
if !row.is_empty() {
let deps: Vec<crate::engine::vertex::VertexId> = row
.iter()
.map(|d| crate::engine::vertex::VertexId(*d))
.collect();
self.g.add_edges_nobatch(tvid, &deps);
}
}
self.g.end_batch();
if dbg {
eprintln!(
"[fz][ingest] finalize: delta done in {} ms (total {} ms)",
t_delta0.elapsed().as_millis(),
t_fin0.elapsed().as_millis()
);
}
} else {
let mut t_coords_ms = 0u128;
if coord_accum.is_empty() || id_accum.is_empty() {
if dbg {
eprintln!("[fz][ingest] finalize: gathering coords/ids");
}
let t_coords0 = Instant::now();
for vid in self.g.iter_vertex_ids() {
coord_accum.push(self.g.vertex_coord(vid));
id_accum.push(vid.0);
}
t_coords_ms = t_coords0.elapsed().as_millis();
}
if dbg {
eprintln!("[fz][ingest] finalize: building CSR");
}
let t_csr0 = Instant::now();
self.g
.build_edges_from_adjacency(edges_adj, coord_accum, id_accum);
if dbg {
eprintln!(
"[fz][ingest] finalize: rows={}, gather_coords={} ms, csr_build={} ms, total={} ms",
rows,
t_coords_ms,
t_csr0.elapsed().as_millis(),
t_fin0.elapsed().as_millis()
);
}
}
}
self.g.set_sheet_index_mode(self.cfg_saved.sheet_index_mode);
Ok(BulkIngestSummary {
sheets: 0, vertices: total_vertices,
formulas: total_formulas,
edges: total_edges,
elapsed: t0.elapsed(),
})
}
}