use std::collections::HashMap;
use bb_ir::proto::onnx::NodeProto;
use bb_runtime::atomic::DispatchResult;
use bb_runtime::bus::OpError;
use bb_runtime::envelope::{SlotFill, WireCorrelation, WireEnvelope};
use bb_runtime::runtime::RuntimeResourceRef;
use bb_runtime::slot_value::SlotValue;
use bb_runtime::syscall::values::{BytesValue, PeerIdVecValue, WireReqIdValue};
pub const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
pub const WIRE_VERSION: i64 = 1;
pub struct SendOp;
pub struct RecvOp;
pub fn invoke(
_node: &NodeProto,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, OpError> {
invoke_send(inputs, ctx)
}
fn encode_or_error(name: &str, value: &dyn SlotValue) -> Result<Vec<u8>, OpError> {
value.to_wire_bytes().map_err(|e| OpError {
kind: bb_runtime::bus::OpErrorKind::ExecutionFailed,
reason: "wire_encode_failed",
detail: format!("wire encode of slot `{name}` failed: {e}"),
})
}
pub fn invoke_recv(
_node: &NodeProto,
_inputs: &[(&str, &dyn SlotValue)],
_ctx: &mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, OpError> {
Ok(DispatchResult::Immediate(vec![]))
}
fn build_envelope(
dest_peer_addresses: Vec<Vec<u8>>,
fills: Vec<SlotFill>,
correlation: WireCorrelation,
) -> WireEnvelope {
WireEnvelope {
dest_peer_addresses,
fills,
correlation: Some(correlation),
remaining_deadline_ns: 0,
edge_rtt_reports: Vec::new(),
..Default::default()
}
}
pub(crate) fn invoke_send(
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, OpError> {
let dest_peers = extract_dest_peers(inputs, ctx);
let fills = collect_fills(inputs, ctx)?;
let (req_id_u64, is_forwarder) = match ctx.current.inbound.wire_req_id {
Some(inbound) if inbound != 0 => (inbound, true),
_ => {
let chain_id =
read_metadata_u64(ctx.current.node_metadata, "ai.bytesandbrains.wire.chain_id");
let token = chain_id.unwrap_or_else(|| ctx.net.requests.mint_token().as_u64());
(token, false)
}
};
let chain_ctx = ctx.read_chain_context();
let mut outbound_deadline_ns: u64 = 0;
if let (Some(inbound_remaining), Some(arrival_ns)) = (
ctx.current.inbound.remaining_deadline_ns,
ctx.current.inbound.arrival_ns,
) {
let now_ns = ctx.time.scheduler.now_ns();
let elapsed = now_ns.saturating_sub(arrival_ns);
outbound_deadline_ns = inbound_remaining.saturating_sub(elapsed);
} else if let Some(static_deadline_ns) =
read_attribute_u64(ctx.current.node_attributes, "deadline_ns")
{
outbound_deadline_ns = static_deadline_ns;
} else if let Some(first_peer) = dest_peers.first().copied() {
outbound_deadline_ns = ctx.estimate_wire_budget_ns(
peer_to_site(first_peer),
chain_ctx,
bb_ir::syscall_ids::DEFAULT_PER_HOP_BUDGET_NS,
);
}
if let Some(first_peer) = dest_peers.first().copied() {
if !is_forwarder {
let target_site = peer_to_site(first_peer);
let ttl = std::num::NonZeroU64::new(outbound_deadline_ns)
.unwrap_or(unsafe { std::num::NonZeroU64::new_unchecked(1) });
ctx.net.requests.register_in_flight(
req_id_u64,
ctx.time.scheduler.now_ns(),
target_site,
chain_ctx,
ttl,
None,
);
}
}
let kind = if req_id_u64 != 0 {
bb_runtime::envelope::CorrelationKind::Request as i32
} else {
bb_runtime::envelope::CorrelationKind::None as i32
};
let src_peer_addresses: Vec<Vec<u8>> =
ctx.local_addresses().iter().map(|a| a.to_bytes()).collect();
for peer in &dest_peers {
let resolved: Option<Vec<Vec<u8>>> = ctx
.peers
.addresses
.lookup(*peer)
.filter(|s| !s.is_empty())
.map(|s| s.iter().map(|a| a.to_bytes()).collect());
match resolved {
Some(dest_peer_addresses) => {
let mut env = build_envelope(
dest_peer_addresses,
fills.clone(),
WireCorrelation {
kind,
wire_req_id: req_id_u64,
},
);
env.remaining_deadline_ns = outbound_deadline_ns;
env.src_peer_addresses = src_peer_addresses.clone();
ctx.net.outbound.push(env);
}
None => {
ctx.net
.pending_peer_resolve_failures
.push((Some(*peer), ctx.current.op_ref));
ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
bb_runtime::bus::InfraEvent::PeerResolveFailure {
peer: Some(*peer),
op_ref: ctx.current.op_ref,
},
));
}
}
}
Ok(DispatchResult::Immediate(vec![
(
"data".to_string(),
Box::new(bb_runtime::syscall::values::TriggerValue) as Box<dyn SlotValue>,
),
(
"handle".to_string(),
Box::new(WireReqIdValue(req_id_u64)) as Box<dyn SlotValue>,
),
]))
}
fn read_metadata_u64(
props: &[bb_ir::proto::onnx::StringStringEntryProto],
key: &str,
) -> Option<u64> {
props
.iter()
.find(|p| p.key == key)
.and_then(|p| p.value.parse().ok())
}
fn read_attribute_u64(attrs: &[bb_ir::proto::onnx::AttributeProto], name: &str) -> Option<u64> {
attrs
.iter()
.find(|a| a.name == name)
.map(|a| a.i.max(0) as u64)
}
fn peer_to_site(peer: bb_runtime::ids::PeerId) -> bb_runtime::ids::NodeSiteId {
bb_runtime::ids::NodeSiteId::from(bb_runtime::slot_value::fnv1a_64(peer.digest()))
}
pub(crate) fn extract_dest_peers(
inputs: &[(&str, &dyn SlotValue)],
_ctx: &RuntimeResourceRef<'_>,
) -> Vec<bb_runtime::ids::PeerId> {
let try_decode = |h: &dyn SlotValue| -> Option<Vec<bb_runtime::ids::PeerId>> {
if let Some(p) = h.as_any().downcast_ref::<PeerIdVecValue>() {
return Some(p.0.clone());
}
if let Some(b) = h.as_any().downcast_ref::<BytesValue>() {
if let Ok(p) = bincode::deserialize::<PeerIdVecValue>(&b.0) {
return Some(p.0);
}
}
None
};
for (name, h) in inputs {
if matches!(*name, "dest" | "dest_peer" | "peer" | "peers" | "peer_id") {
if let Some(v) = try_decode(*h) {
return v;
}
}
}
if let Some((_, h)) = inputs.get(1) {
if let Some(v) = try_decode(*h) {
return v;
}
}
Vec::new()
}
const DEST_SUFFIX_ATTR_PREFIX: &str = "ai.bytesandbrains.dest_suffix.";
const TRIGGER_ONLY_ATTR_PREFIX: &str = "ai.bytesandbrains.trigger_only.";
fn collect_fills(
inputs: &[(&str, &dyn SlotValue)],
ctx: &RuntimeResourceRef<'_>,
) -> Result<Vec<SlotFill>, OpError> {
let mut attr_suffixes: HashMap<String, Vec<u8>> = HashMap::new();
let mut attr_trigger_only: HashMap<String, bool> = HashMap::new();
for attr in ctx.current.node_attributes {
if let Some(base) = attr.name.strip_prefix(DEST_SUFFIX_ATTR_PREFIX) {
attr_suffixes.insert(base.to_string(), attr.s.clone());
}
if let Some(base) = attr.name.strip_prefix(TRIGGER_ONLY_ATTR_PREFIX) {
attr_trigger_only.insert(base.to_string(), !attr.s.is_empty() || attr.i != 0);
}
}
let mut input_suffixes: HashMap<&str, Vec<u8>> = HashMap::new();
for (name, h) in inputs {
if let Some(base) = name.strip_suffix("_suffix") {
input_suffixes.insert(base, encode_or_error(name, *h)?);
}
}
let mut fills: Vec<SlotFill> = Vec::new();
for (name, h) in inputs {
if name.ends_with("_suffix")
|| matches!(
*name,
"dest" | "dest_peer" | "peer" | "peers" | "peer_id" | "req_id"
)
{
continue;
}
let payload = encode_or_error(name, *h)?;
let dest_suffix = attr_suffixes
.get(*name)
.cloned()
.or_else(|| input_suffixes.get(*name).cloned())
.unwrap_or_default();
let trigger_only = attr_trigger_only
.get(*name)
.copied()
.unwrap_or(payload.is_empty());
let type_hash = h.type_hash();
fills.push(SlotFill {
dest_suffix,
payload,
trigger_only,
type_hash,
});
}
Ok(fills)
}
use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
inventory::submit! {
_BbOpsSyscallReg {
domain: WIRE_DOMAIN,
op_type: "Send",
invoke,
kind: bb_runtime::registry::RegistrationKind::Syscall,
}
}
inventory::submit! {
_BbOpsSyscallReg {
domain: WIRE_DOMAIN,
op_type: "Recv",
invoke: invoke_recv,
kind: bb_runtime::registry::RegistrationKind::Syscall,
}
}