use std::any::Any;
use std::collections::{HashMap, HashSet};
use bb_ir::ids::PeerId;
use bb_ir::keys::{
parse_binding_key, parse_binding_value, read_model_metadata, BINDING_KEY_PREFIX,
COMPILED_CURRENT_VERSION, COMPILED_KEY,
};
use bb_ir::proto::onnx::{FunctionProto, ModelProto};
use bb_ir::registry::find_concrete_component;
use bb_runtime::concrete::ComponentHandle;
use bb_runtime::engine::dispatch_entry::FunctionKey;
use bb_runtime::framework::Address;
use bb_runtime::ids::ComponentRef;
use bb_runtime::node::Node;
use bb_runtime::registry::ComponentRole as R;
use bb_runtime::registry::{dispatcher_for, roles_for_component};
pub struct Config {
configs: HashMap<String, Box<dyn Any>>,
}
impl Config {
pub fn new() -> Self {
Self {
configs: HashMap::new(),
}
}
pub fn with<C: Any + 'static>(mut self, slot: impl Into<String>, config: C) -> Self {
self.configs.insert(slot.into(), Box::new(config));
self
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum InstallError {
NotCompiled,
IncompatibleCompiledVersion {
got: String,
expected: &'static str,
},
UnknownTarget {
target: String,
available: Vec<String>,
},
InvalidBindingTable {
key: String,
detail: String,
},
UnregisteredConcrete {
type_name: String,
},
MissingConfig {
slot: String,
type_name: String,
},
ConfigTypeMismatch {
slot: String,
type_name: String,
detail: String,
},
ConstructionFailed {
slot: String,
type_name: String,
detail: String,
},
SlotBindingConflict {
slot: String,
conflicts: Vec<(String, String, String)>,
},
EmptyTargets,
}
impl std::fmt::Display for InstallError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotCompiled => write!(
f,
"install: ModelProto carries no `{COMPILED_KEY}` metadata stamp; \
only `bb_compiler::Compiler::compile()` output may be installed",
),
Self::IncompatibleCompiledVersion { got, expected } => write!(
f,
"install: ModelProto was compiled against `{got}` but this framework \
requires `{expected}`; recompile",
),
Self::UnknownTarget { target, available } => write!(
f,
"install: target function `{target}` not found in model.functions[]; \
available targets: {available:?}",
),
Self::InvalidBindingTable { key, detail } => write!(
f,
"install: binding metadata entry `{key}` is malformed: {detail}",
),
Self::UnregisteredConcrete { type_name } => write!(
f,
"install: artifact references `{type_name}` but no \
`inventory::submit!` carrier registers it in this binary",
),
Self::MissingConfig { slot, type_name } => write!(
f,
"install: slot `{slot}` expected a config for `{type_name}` \
(its `Config` associated type is not `()`); add \
`Config::new().with(\"{slot}\", <config>)`",
),
Self::ConfigTypeMismatch {
slot,
type_name,
detail,
} => write!(
f,
"install: slot `{slot}` received a config whose type does not \
match `{type_name}`'s `Config` associated type ({detail})",
),
Self::ConstructionFailed {
slot,
type_name,
detail,
} => write!(
f,
"install: `{type_name}::new` for slot `{slot}` returned an error: {detail}",
),
Self::SlotBindingConflict { slot, conflicts } => {
write!(
f,
"install: slot `{slot}` has conflicting bindings across targets:",
)?;
for (target, type_name, role) in conflicts {
write!(f, "\n target `{target}` → `{type_name}` (role `{role}`)")?;
}
Ok(())
}
Self::EmptyTargets => write!(
f,
"install: targets slice is empty; supply at least one target name",
),
}
}
}
impl std::error::Error for InstallError {}
pub fn install(
peer_id: PeerId,
addresses: Vec<Address>,
model: ModelProto,
targets: &[&str],
config: Config,
) -> Result<Node, InstallError> {
bb_ops::link_force();
if targets.is_empty() {
return Err(InstallError::EmptyTargets);
}
verify_compilation_stamp(&model)?;
let mut resolved_target_names: Vec<String> = Vec::with_capacity(targets.len());
let mut per_target_bindings: Vec<Vec<ResolvedBinding>> = Vec::with_capacity(targets.len());
for raw in targets {
let target_function = find_target(&model, raw)?;
let resolved_name = target_function.name.clone();
let bindings = parse_target_bindings(&model, &resolved_name)?;
resolved_target_names.push(resolved_name);
per_target_bindings.push(bindings);
}
let unified = dedupe_bindings_across_targets(&resolved_target_names, &per_target_bindings)?;
let mut node = Node::new(peer_id, addresses);
let mut registered_dispatchers: HashSet<(&'static str, &'static str)> = HashSet::new();
let unit_default: &dyn Any = &();
for (idx, binding) in unified.iter().enumerate() {
let next_cref = idx as u32;
let entry = find_concrete_component(&binding.type_name).ok_or_else(|| {
InstallError::UnregisteredConcrete {
type_name: binding.type_name.clone(),
}
})?;
let supplied: &dyn Any = config
.configs
.get(&binding.slot)
.map(|b| b.as_ref())
.unwrap_or(unit_default);
let instance = (entry.construct_fn)(supplied).map_err(|e| {
if e.detail.starts_with("config type mismatch:") {
InstallError::ConfigTypeMismatch {
slot: binding.slot.clone(),
type_name: binding.type_name.clone(),
detail: e.detail,
}
} else {
InstallError::ConstructionFailed {
slot: binding.slot.clone(),
type_name: binding.type_name.clone(),
detail: e.detail,
}
}
})?;
register_dispatchers_for(
node.engine_install_handle(),
entry.type_name,
&mut registered_dispatchers,
);
let cref = ComponentRef::from(next_cref);
let instance_id = next_cref;
let engine = node.engine_install_handle();
engine.register_component(cref, instance);
engine.bind_slot(binding.slot.clone(), cref);
if let Some(slot_id) = binding.slot_id {
engine.bind_slot_id(slot_id, cref);
if let Some(role) = parse_role(&binding.role) {
engine.bind_slot_id_with_role(slot_id, role, cref);
}
}
stamp_component_roles(engine, entry.type_name, cref);
node.push_linked_component(ComponentHandle {
type_name: entry.type_name,
package: entry.package,
instance_id,
serialize_fn: entry.serialize_fn,
restore_fn: entry.restore_fn,
state_bytes: Vec::new(),
});
}
install_targets(node.engine_install_handle(), &model, &resolved_target_names);
node.engine_install_handle().resolve_dispatch();
node.set_model(model);
for resolved in &resolved_target_names {
node.register_module(resolved.clone());
}
Ok(node)
}
fn verify_compilation_stamp(model: &ModelProto) -> Result<(), InstallError> {
let Some(got) = read_model_metadata(model, COMPILED_KEY) else {
return Err(InstallError::NotCompiled);
};
if got != COMPILED_CURRENT_VERSION {
return Err(InstallError::IncompatibleCompiledVersion {
got: got.to_string(),
expected: COMPILED_CURRENT_VERSION,
});
}
Ok(())
}
fn find_target<'a>(model: &'a ModelProto, target: &str) -> Result<&'a FunctionProto, InstallError> {
if let Some(exact) = model.functions.iter().find(|f| f.name == target) {
return Ok(exact);
}
let prefix = format!("{target}#");
if let Some(suffixed) = model.functions.iter().find(|f| f.name.starts_with(&prefix)) {
return Ok(suffixed);
}
let available = model
.functions
.iter()
.map(|f| f.name.clone())
.collect::<Vec<_>>();
Err(InstallError::UnknownTarget {
target: target.to_string(),
available,
})
}
#[derive(Debug, Clone)]
struct ResolvedBinding {
slot: String,
type_name: String,
slot_id: Option<u32>,
role: String,
}
fn parse_target_bindings(
model: &ModelProto,
target_name: &str,
) -> Result<Vec<ResolvedBinding>, InstallError> {
let mut out = Vec::new();
for entry in &model.metadata_props {
if !entry.key.starts_with(BINDING_KEY_PREFIX) {
continue;
}
let Some((target, slot)) = parse_binding_key(&entry.key) else {
return Err(InstallError::InvalidBindingTable {
key: entry.key.clone(),
detail: "key not in `ai.bytesandbrains.binding.<target>.<slot>` form".into(),
});
};
if target != target_name {
continue;
}
let Some((role, type_name, slot_id)) = parse_binding_value(&entry.value) else {
return Err(InstallError::InvalidBindingTable {
key: entry.key.clone(),
detail: format!(
"value `{}` not in `<role>|<TYPE_NAME>|<slot_id|-1>` form",
entry.value
),
});
};
let slot_id = if slot_id < 0 {
None
} else {
Some(slot_id as u32)
};
out.push(ResolvedBinding {
slot: slot.to_string(),
type_name: type_name.to_string(),
slot_id,
role: role.to_string(),
});
}
Ok(out)
}
fn register_dispatchers_for(
engine: &mut bb_runtime::engine::Engine,
type_name: &'static str,
registered: &mut HashSet<(&'static str, &'static str)>,
) {
for role in roles_for_component(type_name) {
let key = (type_name, role_as_str(role));
if !registered.insert(key) {
continue;
}
if let Some(register_fn) = dispatcher_for(type_name, role) {
register_fn(engine);
}
}
let bootstrap_key = (type_name, "bootstrap");
if registered.insert(bootstrap_key) {
if let Some(register_fn) = bb_runtime::registry::bootstrap_dispatcher_for(type_name) {
register_fn(engine);
}
}
}
fn stamp_component_roles(
engine: &mut bb_runtime::engine::Engine,
type_name: &str,
cref: ComponentRef,
) {
let roles: std::collections::HashSet<bb_runtime::registry::ComponentRole> =
roles_for_component(type_name).collect();
if !roles.is_empty() {
engine.set_component_roles(cref, roles);
}
}
fn install_targets(
engine: &mut bb_runtime::engine::Engine,
model: &ModelProto,
resolved_target_names: &[String],
) {
let target_set: HashSet<&str> = resolved_target_names.iter().map(|s| s.as_str()).collect();
let mut entry_point_keys: Vec<FunctionKey> = Vec::with_capacity(resolved_target_names.len());
for resolved in resolved_target_names {
let Some(entry) = model
.functions
.iter()
.find(|f| f.name == *resolved)
.cloned()
else {
continue;
};
entry_point_keys.push((
entry.domain.clone(),
entry.name.clone(),
entry.overload.clone(),
));
engine.install_graph(entry.name.clone(), entry);
}
let sub_functions: Vec<bb_ir::proto::onnx::FunctionProto> = model
.functions
.iter()
.filter(|f| !target_set.contains(f.name.as_str()))
.cloned()
.collect();
engine.install_function_library(&sub_functions, &entry_point_keys);
}
#[derive(Debug, Clone)]
struct UnifiedBinding {
slot: String,
type_name: String,
slot_id: Option<u32>,
role: String,
}
fn dedupe_bindings_across_targets(
target_names: &[String],
per_target_bindings: &[Vec<ResolvedBinding>],
) -> Result<Vec<UnifiedBinding>, InstallError> {
let mut order: Vec<String> = Vec::new();
let mut by_slot: HashMap<String, UnifiedBinding> = HashMap::new();
let mut contributors: HashMap<String, Vec<(String, String, String)>> = HashMap::new();
for (target_idx, bindings) in per_target_bindings.iter().enumerate() {
let target_name = &target_names[target_idx];
for binding in bindings {
contributors.entry(binding.slot.clone()).or_default().push((
target_name.clone(),
binding.type_name.clone(),
binding.role.clone(),
));
match by_slot.get(&binding.slot) {
None => {
order.push(binding.slot.clone());
by_slot.insert(
binding.slot.clone(),
UnifiedBinding {
slot: binding.slot.clone(),
type_name: binding.type_name.clone(),
slot_id: binding.slot_id,
role: binding.role.clone(),
},
);
}
Some(existing) => {
if existing.type_name != binding.type_name || existing.role != binding.role {
return Err(InstallError::SlotBindingConflict {
slot: binding.slot.clone(),
conflicts: contributors.remove(&binding.slot).unwrap_or_default(),
});
}
}
}
}
}
Ok(order
.into_iter()
.map(|slot| by_slot.remove(&slot).expect("slot inserted above"))
.collect())
}
fn role_as_str(role: bb_runtime::registry::ComponentRole) -> &'static str {
match role {
R::Index => "Index",
R::Aggregator => "Aggregator",
R::Model => "Model",
R::Codec => "Codec",
R::DataSource => "DataSource",
R::PeerSelector => "PeerSelector",
R::Backend => "Backend",
R::Protocol => "Protocol",
}
}
fn parse_role(role: &str) -> Option<bb_runtime::registry::ComponentRole> {
match role {
"Index" => Some(R::Index),
"Aggregator" => Some(R::Aggregator),
"Model" => Some(R::Model),
"Codec" => Some(R::Codec),
"DataSource" => Some(R::DataSource),
"PeerSelector" => Some(R::PeerSelector),
"Backend" => Some(R::Backend),
"Protocol" => Some(R::Protocol),
_ => None,
}
}