1use std::collections::HashSet;
7
8use crate::error::CompileError;
9use crate::infer_peer_classes::infer_peer_classes;
10use crate::{
11 analyze_wire_edges, derive_wire_deadlines, expand_ops, inline_for_partition,
12 insert_async_deadlines, insert_backoff_gate_rx, insert_backoff_gate_tx, insert_dedup_gate_rx,
13 insert_peer_health_gate_rx, insert_peer_health_gate_tx, partition_by_wire_ops, resolve_slots,
14 synthesize_wire_recvs, validate, validate_bootstrap_composition, validate_runtime_complete,
15 verify_no_dangling_calls,
16};
17use bb_dsl::recorded::RecordedModule;
18use bb_ir::proto::onnx::{FunctionProto, GraphProto, ModelProto};
19
20pub const CANONICAL_PASS_NAMES: &[&str] = &[
23 "inline_for_partition",
24 "derive_wire_deadlines",
25 "validate",
26 "expand_ops",
27 "type_solver",
28 "infer_peer_classes",
29 "synthesize_wire_recvs",
30 "partition_by_wire_ops",
31 "resolve_slots",
32 "analyze_wire_edges",
33 "insert_dedup_gate_rx",
34 "insert_peer_health_gate_rx",
35 "insert_backoff_gate_rx",
36 "insert_peer_health_gate_tx",
37 "insert_backoff_gate_tx",
38 "insert_async_deadlines",
39 "validate_runtime_complete",
40];
41
42pub(crate) fn run_pipeline_with_options(
46 recorded: RecordedModule,
47 _module_name: String,
48 enabled: &HashSet<String>,
49 per_hop_budget_ns: u64,
50 strict_types: bool,
51) -> Result<Vec<ModelProto>, CompileError> {
52 let RecordedModule {
53 function,
54 sub_functions: dsl_sub_functions,
55 } = recorded;
56
57 let on = |name: &str| enabled.contains(name);
58
59 let mut temp = ModelProto::default();
60 temp.functions.push(function);
61 temp.functions.extend(dsl_sub_functions);
62 if on("inline_for_partition") {
63 inline_for_partition(&mut temp)?;
64 }
65 if on("derive_wire_deadlines") {
66 derive_wire_deadlines(&mut temp, per_hop_budget_ns)?;
67 }
68
69 bb_ir::verify::types(&temp).map_err(|e| CompileError::Internal {
71 detail: format!("verify::types failed at frontend seam: {e}"),
72 })?;
73 bb_ir::verify::function_calls(&temp).map_err(|e| CompileError::Internal {
74 detail: format!("verify::function_calls failed at frontend seam: {e}"),
75 })?;
76
77 let mut models: Vec<ModelProto> = Vec::new();
78
79 let shared_functions: Vec<FunctionProto> = temp.functions.iter().skip(1).cloned().collect();
84 let root = temp
85 .functions
86 .first()
87 .ok_or_else(|| CompileError::Internal {
88 detail: "compiler received an empty function table".into(),
89 })?;
90 let root_name = root.name.clone();
91
92 validate_bootstrap_composition(&temp, &root_name)?;
97
98 let root = temp
99 .functions
100 .into_iter()
101 .next()
102 .expect("non-empty checked above");
103 let target_models = process_target(root, &root_name, enabled, &shared_functions, strict_types)?;
104 models.extend(target_models);
105
106 if models.is_empty() {
107 return Err(CompileError::Internal {
108 detail: "compiler produced no partitions - recorded function was empty".into(),
109 });
110 }
111 Ok(models)
112}
113
114fn process_target(
121 mut target_function: FunctionProto,
122 target_name: &str,
123 enabled: &HashSet<String>,
124 shared_functions: &[FunctionProto],
125 strict_types: bool,
126) -> Result<Vec<ModelProto>, CompileError> {
127 let on = |name: &str| enabled.contains(name);
128
129 if on("validate") {
130 let view = function_to_graph_view(&target_function);
131 validate(&view).map_err(CompileError::Validation)?;
132 }
133
134 let view = function_to_graph_view(&target_function);
135 let mut graph = view;
136 if on("expand_ops") {
137 expand_ops(&mut graph)?;
138 }
139 let type_solver_ran = if on("type_solver") {
142 run_type_solver(&mut graph, strict_types)?;
143 true
144 } else {
145 false
146 };
147 if on("infer_peer_classes") {
148 infer_peer_classes(&mut graph)?;
149 }
150 if on("synthesize_wire_recvs") {
151 synthesize_wire_recvs(&mut graph)?;
152 if type_solver_ran {
157 let fresh_solution = run_type_solver(&mut graph, strict_types)?;
158 check_wire_edge_types(&graph, &fresh_solution)?;
159 }
160 }
161 target_function = merge_graph_into_function(target_function, graph);
162
163 let view = function_to_graph_view(&target_function);
164 let mut analysis = partition_by_wire_ops(&view)?;
165
166 if on("resolve_slots") {
167 resolve_slots(&target_function)?;
168 }
169
170 if on("analyze_wire_edges") {
174 for sub_graph in analysis.per_role.values_mut() {
175 analyze_wire_edges(sub_graph, &analysis.wire_edges)?;
176 }
177 }
178
179 let mut models: Vec<ModelProto> = Vec::new();
180 for (role, mut sub_graph) in analysis.per_role {
181 let hoisted: Vec<FunctionProto> = Vec::new();
182
183 if on("insert_dedup_gate_rx") {
184 insert_dedup_gate_rx(&mut sub_graph)?;
185 }
186 if on("insert_peer_health_gate_rx") {
187 insert_peer_health_gate_rx(&mut sub_graph)?;
188 }
189 if on("insert_backoff_gate_rx") {
190 insert_backoff_gate_rx(&mut sub_graph)?;
191 }
192 if on("insert_peer_health_gate_tx") {
193 insert_peer_health_gate_tx(&mut sub_graph)?;
194 }
195 if on("insert_backoff_gate_tx") {
196 insert_backoff_gate_tx(&mut sub_graph)?;
197 }
198 if on("insert_async_deadlines") {
199 insert_async_deadlines(&mut sub_graph)?;
200 }
201
202 if on("validate_runtime_complete") {
203 validate_runtime_complete(&sub_graph)?;
204 }
205
206 let (composite_name, mut partition_function) =
207 split_partition(&target_function, role.clone(), &sub_graph, target_name);
208 partition_function.name = composite_name;
209 let mut all_hoisted = hoisted;
210 all_hoisted.extend(shared_functions.iter().cloned());
211 let model = wrap_as_model(partition_function, all_hoisted);
212 verify_no_dangling_calls(&model)?;
213 models.push(model);
214 }
215
216 Ok(models)
217}
218
219use bb_ir::proto::function_to_graph_view;
220
221fn merge_graph_into_function(mut function: FunctionProto, graph: GraphProto) -> FunctionProto {
222 function.node = graph.node;
223 function
224}
225
226fn split_partition(
227 base: &FunctionProto,
228 role: String,
229 sub_graph: &GraphProto,
230 module_name: &str,
231) -> (String, FunctionProto) {
232 let mut function = base.clone();
233 function.node = sub_graph.node.clone();
234 let base_name = if role == "@default" || role == bb_ir::peer_class::SELF_CLASS {
235 module_name.to_string()
236 } else if role == module_name || role.starts_with(&format!("{module_name}_")) {
237 role
238 } else {
239 format!("{module_name}_{role}")
240 };
241 let content_hash = crate::function_dedup::hash_node_bodies(&sub_graph.node);
246 let composite_name = format!("{base_name}#{content_hash:016x}");
247 (composite_name, function)
248}
249
250fn wrap_as_model(function: FunctionProto, hoisted: Vec<FunctionProto>) -> ModelProto {
251 let mut functions = Vec::with_capacity(1 + hoisted.len());
252 functions.push(function);
253 functions.extend(hoisted);
254 ModelProto {
255 functions,
256 ..Default::default()
257 }
258}
259
260fn run_type_solver(
285 graph: &mut GraphProto,
286 strict: bool,
287) -> Result<crate::type_solver::TypeSolution, CompileError> {
288 let decl_for_op = |_: &str, _: &str| -> Option<&'static bb_ir::atomic::AtomicOpDecl> { None };
289
290 let mut solver = crate::type_solver::TypeSolver::from_graph(graph, decl_for_op)
291 .map_err(CompileError::from)?;
292 solver.seed_from_value_info(graph);
293 let solution = if strict {
294 solver.solve_strict().map_err(CompileError::from)?
295 } else {
296 solver.solve().map_err(CompileError::from)?
297 };
298 crate::type_solver::TypeSolver::apply_solution_to_value_info(graph, &solution);
299 Ok(solution)
300}
301
302pub(crate) fn check_wire_edge_types(
311 graph: &GraphProto,
312 solution: &crate::type_solver::TypeSolution,
313) -> Result<(), CompileError> {
314 const SYNTHESIZED_FROM_KEY: &str = "ai.bytesandbrains.synthesized_from_send";
315 const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
316
317 for node in &graph.node {
318 if node.domain != WIRE_DOMAIN || node.op_type != "Recv" {
319 continue;
320 }
321 let Some(src_val) = node
324 .metadata_props
325 .iter()
326 .find(|p| p.key == SYNTHESIZED_FROM_KEY)
327 .map(|p| p.value.as_str())
328 else {
329 continue;
330 };
331 let Some(dst_val) = node.output.first().filter(|s| !s.is_empty()) else {
333 continue;
334 };
335
336 let Some(actual_node) = solution.type_of(src_val) else {
337 continue;
338 };
339 let Some(expected_node) = solution.type_of(dst_val) else {
340 continue;
341 };
342
343 if !actual_node.is_concrete() || !expected_node.is_concrete() {
344 continue;
345 }
346 if std::ptr::eq(actual_node, expected_node) {
347 continue;
348 }
349
350 return Err(CompileError::IncompatibleStorageOnEdge {
351 src: src_val.to_string(),
352 dst: dst_val.to_string(),
353 expected_id: expected_node.id,
354 actual_id: actual_node.id,
355 });
356 }
357 Ok(())
358}