Skip to main content

bb_compiler/
runner.rs

1//! Pipeline orchestrator. Runs canonical passes in order and emits
2//! one `ModelProto` per partition; `functions[0]` is the partition
3//! main, `functions[1..]` are hoisted sub-Module bodies. Driven
4//! from [`crate::driver::Compiler::compile`].
5
6use std::collections::HashSet;
7
8use crate::error::CompileError;
9use crate::infer_peer_classes::infer_peer_classes;
10use crate::{
11    analyze_wire_edges, derive_wire_deadlines, expand_ops, inline_for_partition,
12    insert_async_deadlines, insert_backoff_gate_rx, insert_backoff_gate_tx, insert_dedup_gate_rx,
13    insert_peer_health_gate_rx, insert_peer_health_gate_tx, partition_by_wire_ops, resolve_slots,
14    synthesize_wire_recvs, validate, validate_bootstrap_composition, validate_runtime_complete,
15    verify_no_dangling_calls,
16};
17use bb_dsl::recorded::RecordedModule;
18use bb_ir::proto::onnx::{FunctionProto, GraphProto, ModelProto};
19
20/// Canonical pass names in pipeline order. Each pass assumes the
21/// prior passes' invariants.
22pub const CANONICAL_PASS_NAMES: &[&str] = &[
23    "inline_for_partition",
24    "derive_wire_deadlines",
25    "validate",
26    "expand_ops",
27    "type_solver",
28    "infer_peer_classes",
29    "synthesize_wire_recvs",
30    "partition_by_wire_ops",
31    "resolve_slots",
32    "analyze_wire_edges",
33    "insert_dedup_gate_rx",
34    "insert_peer_health_gate_rx",
35    "insert_backoff_gate_rx",
36    "insert_peer_health_gate_tx",
37    "insert_backoff_gate_tx",
38    "insert_async_deadlines",
39    "validate_runtime_complete",
40];
41
42/// Run the pipeline; canonical passes fire only when named in
43/// `enabled`. Wire ops in the root drive partitioning after
44/// `inline_for_partition` surfaces them.
45pub(crate) fn run_pipeline_with_options(
46    recorded: RecordedModule,
47    _module_name: String,
48    enabled: &HashSet<String>,
49    per_hop_budget_ns: u64,
50    strict_types: bool,
51) -> Result<Vec<ModelProto>, CompileError> {
52    let RecordedModule {
53        function,
54        sub_functions: dsl_sub_functions,
55    } = recorded;
56
57    let on = |name: &str| enabled.contains(name);
58
59    let mut temp = ModelProto::default();
60    temp.functions.push(function);
61    temp.functions.extend(dsl_sub_functions);
62    if on("inline_for_partition") {
63        inline_for_partition(&mut temp)?;
64    }
65    if on("derive_wire_deadlines") {
66        derive_wire_deadlines(&mut temp, per_hop_budget_ns)?;
67    }
68
69    // Seam verifiers catch malformed phase output at the source.
70    bb_ir::verify::types(&temp).map_err(|e| CompileError::Internal {
71        detail: format!("verify::types failed at frontend seam: {e}"),
72    })?;
73    bb_ir::verify::function_calls(&temp).map_err(|e| CompileError::Internal {
74        detail: format!("verify::function_calls failed at frontend seam: {e}"),
75    })?;
76
77    let mut models: Vec<ModelProto> = Vec::new();
78
79    // After inline_for_partition, functions[0] is the root and the
80    // only "target" — partition_by_wire_ops will slice it into one
81    // installable per connected component. Surviving non-inlined
82    // FunctionProtos are shared and attach to each emitted ModelProto.
83    let shared_functions: Vec<FunctionProto> = temp.functions.iter().skip(1).cloned().collect();
84    let root = temp
85        .functions
86        .first()
87        .ok_or_else(|| CompileError::Internal {
88            detail: "compiler received an empty function table".into(),
89        })?;
90    let root_name = root.name.clone();
91
92    // Validate the bootstrap composition tree against the full
93    // top-level model view — the `<root>__bootstrap` FunctionProto
94    // and every child bootstrap still live in `temp.functions`
95    // here; per-partition passes never see the bootstrap table.
96    validate_bootstrap_composition(&temp, &root_name)?;
97
98    let root = temp
99        .functions
100        .into_iter()
101        .next()
102        .expect("non-empty checked above");
103    let target_models = process_target(root, &root_name, enabled, &shared_functions, strict_types)?;
104    models.extend(target_models);
105
106    if models.is_empty() {
107        return Err(CompileError::Internal {
108            detail: "compiler produced no partitions - recorded function was empty".into(),
109        });
110    }
111    Ok(models)
112}
113
114/// Run the per-target pipeline: validate, structural transforms,
115/// partition by wire ops, then per-partition transforms, emitting
116/// one `ModelProto` per home-class partition the target produces.
117/// `shared_functions` are FunctionProtos that survived
118/// `inline_for_partition` and are carried into every emitted
119/// ModelProto so CALL nodes still resolve.
120fn process_target(
121    mut target_function: FunctionProto,
122    target_name: &str,
123    enabled: &HashSet<String>,
124    shared_functions: &[FunctionProto],
125    strict_types: bool,
126) -> Result<Vec<ModelProto>, CompileError> {
127    let on = |name: &str| enabled.contains(name);
128
129    if on("validate") {
130        let view = function_to_graph_view(&target_function);
131        validate(&view).map_err(CompileError::Validation)?;
132    }
133
134    let view = function_to_graph_view(&target_function);
135    let mut graph = view;
136    if on("expand_ops") {
137        expand_ops(&mut graph)?;
138    }
139    // Track whether the type_solver pass ran so `check_wire_edge_types`
140    // (which needs a TypeSolution) is only invoked when the solver did run.
141    let type_solver_ran = if on("type_solver") {
142        run_type_solver(&mut graph, strict_types)?;
143        true
144    } else {
145        false
146    };
147    if on("infer_peer_classes") {
148        infer_peer_classes(&mut graph)?;
149    }
150    if on("synthesize_wire_recvs") {
151        synthesize_wire_recvs(&mut graph)?;
152        // check_wire_edge_types must run AFTER synthesis: the Recv
153        // nodes need to exist, and the freshly minted recv-output
154        // value names need to be in the TypeSolution before the
155        // send-vs-recv type compare is meaningful.
156        if type_solver_ran {
157            let fresh_solution = run_type_solver(&mut graph, strict_types)?;
158            check_wire_edge_types(&graph, &fresh_solution)?;
159        }
160    }
161    target_function = merge_graph_into_function(target_function, graph);
162
163    let view = function_to_graph_view(&target_function);
164    let mut analysis = partition_by_wire_ops(&view)?;
165
166    if on("resolve_slots") {
167        resolve_slots(&target_function)?;
168    }
169
170    // analyze_wire_edges stamps classification metadata onto each
171    // sub_graph's Send/Recv nodes; analysis.wire_edges drives the
172    // iteration read-only.
173    if on("analyze_wire_edges") {
174        for sub_graph in analysis.per_role.values_mut() {
175            analyze_wire_edges(sub_graph, &analysis.wire_edges)?;
176        }
177    }
178
179    let mut models: Vec<ModelProto> = Vec::new();
180    for (role, mut sub_graph) in analysis.per_role {
181        let hoisted: Vec<FunctionProto> = Vec::new();
182
183        if on("insert_dedup_gate_rx") {
184            insert_dedup_gate_rx(&mut sub_graph)?;
185        }
186        if on("insert_peer_health_gate_rx") {
187            insert_peer_health_gate_rx(&mut sub_graph)?;
188        }
189        if on("insert_backoff_gate_rx") {
190            insert_backoff_gate_rx(&mut sub_graph)?;
191        }
192        if on("insert_peer_health_gate_tx") {
193            insert_peer_health_gate_tx(&mut sub_graph)?;
194        }
195        if on("insert_backoff_gate_tx") {
196            insert_backoff_gate_tx(&mut sub_graph)?;
197        }
198        if on("insert_async_deadlines") {
199            insert_async_deadlines(&mut sub_graph)?;
200        }
201
202        if on("validate_runtime_complete") {
203            validate_runtime_complete(&sub_graph)?;
204        }
205
206        let (composite_name, mut partition_function) =
207            split_partition(&target_function, role.clone(), &sub_graph, target_name);
208        partition_function.name = composite_name;
209        let mut all_hoisted = hoisted;
210        all_hoisted.extend(shared_functions.iter().cloned());
211        let model = wrap_as_model(partition_function, all_hoisted);
212        verify_no_dangling_calls(&model)?;
213        models.push(model);
214    }
215
216    Ok(models)
217}
218
219use bb_ir::proto::function_to_graph_view;
220
221fn merge_graph_into_function(mut function: FunctionProto, graph: GraphProto) -> FunctionProto {
222    function.node = graph.node;
223    function
224}
225
226fn split_partition(
227    base: &FunctionProto,
228    role: String,
229    sub_graph: &GraphProto,
230    module_name: &str,
231) -> (String, FunctionProto) {
232    let mut function = base.clone();
233    function.node = sub_graph.node.clone();
234    let base_name = if role == "@default" || role == bb_ir::peer_class::SELF_CLASS {
235        module_name.to_string()
236    } else if role == module_name || role.starts_with(&format!("{module_name}_")) {
237        role
238    } else {
239        format!("{module_name}_{role}")
240    };
241    // 16-hex-char (full u64) suffix gives a ~4 G-partition
242    // birthday-collision bound. Identical content under the same
243    // base name collides intentionally (it IS the same partition);
244    // a body edit shifts the hash so snapshots remain partition-keyed.
245    let content_hash = crate::function_dedup::hash_node_bodies(&sub_graph.node);
246    let composite_name = format!("{base_name}#{content_hash:016x}");
247    (composite_name, function)
248}
249
250fn wrap_as_model(function: FunctionProto, hoisted: Vec<FunctionProto>) -> ModelProto {
251    let mut functions = Vec::with_capacity(1 + hoisted.len());
252    functions.push(function);
253    functions.extend(hoisted);
254    ModelProto {
255        functions,
256        ..Default::default()
257    }
258}
259
260/// Run the TypeSolver pass over `graph`, narrowing each value's
261/// type from `TYPE_BYTES` to the most specific TypeNode
262/// that connected ops' `type_relations` declarations permit.
263///
264/// Permissive-by-default: values whose constraints don't fully
265/// resolve stay at `TYPE_ANY`. Set `strict = true` to surface
266/// unresolved values as a typed `BuildError::UnresolvedType` —
267/// useful for users who want a hard guarantee at compile time
268/// that every input has a concrete TypeNode.
269///
270/// Op `(domain, op_type)` → `AtomicOpDecl` lookup walks every
271/// registered opset (placeholders, syscalls, backends, custom
272/// `bb::register_op!{}` entries) via the existing inventory
273/// channel.
274/// Run the TypeSolver pass over `graph` and return the resolved
275/// `TypeSolution`. The solution is also applied to `graph.value_info`
276/// in place.
277///
278/// `check_wire_edge_types` is NOT called here — it must run AFTER
279/// `synthesize_wire_recvs` so that Recv nodes exist in the graph and
280/// the minted recv-output value names are present in the solution.
281/// Callers should call `run_type_solver` again (or
282/// `check_wire_edge_types` directly with a fresh solution) after
283/// synthesis to perform the wire-edge check.
284fn run_type_solver(
285    graph: &mut GraphProto,
286    strict: bool,
287) -> Result<crate::type_solver::TypeSolution, CompileError> {
288    let decl_for_op = |_: &str, _: &str| -> Option<&'static bb_ir::atomic::AtomicOpDecl> { None };
289
290    let mut solver = crate::type_solver::TypeSolver::from_graph(graph, decl_for_op)
291        .map_err(CompileError::from)?;
292    solver.seed_from_value_info(graph);
293    let solution = if strict {
294        solver.solve_strict().map_err(CompileError::from)?
295    } else {
296        solver.solve().map_err(CompileError::from)?
297    };
298    crate::type_solver::TypeSolver::apply_solution_to_value_info(graph, &solution);
299    Ok(solution)
300}
301
302/// Walk `graph.node` for synthesized `wire.Recv` NodeProtos. For each
303/// Recv that carries `SYNTHESIZED_FROM_KEY` metadata, compare the
304/// resolved type of the send-side value against the resolved type of
305/// the recv-side value. If both are concrete and different, no `Codec`
306/// bridge was wired on the edge and the author must insert one.
307///
308/// Exposed as `pub(crate)` so `type_solver_tests` can exercise the
309/// check in isolation against hand-built graphs.
310pub(crate) fn check_wire_edge_types(
311    graph: &GraphProto,
312    solution: &crate::type_solver::TypeSolution,
313) -> Result<(), CompileError> {
314    const SYNTHESIZED_FROM_KEY: &str = "ai.bytesandbrains.synthesized_from_send";
315    const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
316
317    for node in &graph.node {
318        if node.domain != WIRE_DOMAIN || node.op_type != "Recv" {
319            continue;
320        }
321        // Look up the send-side value name from the SYNTHESIZED_FROM_KEY
322        // metadata that `synthesize_wire_recvs` stamps on every Recv.
323        let Some(src_val) = node
324            .metadata_props
325            .iter()
326            .find(|p| p.key == SYNTHESIZED_FROM_KEY)
327            .map(|p| p.value.as_str())
328        else {
329            continue;
330        };
331        // The recv-side value name is the Recv's first output.
332        let Some(dst_val) = node.output.first().filter(|s| !s.is_empty()) else {
333            continue;
334        };
335
336        let Some(actual_node) = solution.type_of(src_val) else {
337            continue;
338        };
339        let Some(expected_node) = solution.type_of(dst_val) else {
340            continue;
341        };
342
343        if !actual_node.is_concrete() || !expected_node.is_concrete() {
344            continue;
345        }
346        if std::ptr::eq(actual_node, expected_node) {
347            continue;
348        }
349
350        return Err(CompileError::IncompatibleStorageOnEdge {
351            src: src_val.to_string(),
352            dst: dst_val.to_string(),
353            expected_id: expected_node.id,
354            actual_id: actual_node.id,
355        });
356    }
357    Ok(())
358}