bb_ops/network/wire/mod.rs
1//! `ai.bytesandbrains.wire v1` opset - engine-native.
2//!
3//! Two ops, exactly one of which the user authors:
4//!
5//! - `Send` (user-facing): takes a payload + a typed `peer` input
6//! carrying a [`PeerId`](bb_runtime::ids::PeerId); resolves the peer to
7//! a multiaddr through `ctx.peers.addresses`, builds one `SlotFill`
8//! per non-peer input from the compiler-stamped `dest_suffix`
9//! metadata, ships the envelope, and returns `(data, handle)` -
10//! a structural-placeholder `Trigger` plus a freshly minted
11//! `wire_req_id`.
12//!
13//! - `Recv` (framework-synthesized): emitted on receiver partitions
14//! by [bb_compiler::synthesize_wire_recvs()]. Pure structural
15//! placeholder so the receiver's `FunctionProto` stays a closed
16//! DAG and the consumer's input value name resolves to a
17//! `NodeSiteId` the inbound `deliver_fill` writes into. The
18//! dispatch handler returns no outputs - Recv never lands on the
19//! frontier in normal flow.
20//!
21//! Both ops register through the same path as every other syscall
22//! (`Engine::register_syscall`). There is no separate `WireRuntime`
23//! binding - wire is engine infrastructure, not a user role.
24
25use std::collections::HashMap;
26
27use bb_ir::proto::onnx::NodeProto;
28use bb_runtime::atomic::DispatchResult;
29use bb_runtime::bus::OpError;
30use bb_runtime::envelope::{SlotFill, WireCorrelation, WireEnvelope};
31use bb_runtime::runtime::RuntimeResourceRef;
32use bb_runtime::slot_value::SlotValue;
33use bb_runtime::syscall::values::{BytesValue, PeerIdVecValue, WireReqIdValue};
34
35/// Wire opset domain.
36pub const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
37
38/// Wire opset version.
39pub const WIRE_VERSION: i64 = 1;
40
41/// Marker struct for `register_syscall::<SendOp>`.
42pub struct SendOp;
43
44/// Marker struct for `register_syscall::<RecvOp>`.
45pub struct RecvOp;
46
47/// `Send` dispatch entry point matching `StatelessInvokeFn`.
48pub fn invoke(
49 _node: &NodeProto,
50 inputs: &[(&str, &dyn SlotValue)],
51 ctx: &mut RuntimeResourceRef<'_>,
52) -> Result<DispatchResult, OpError> {
53 invoke_send(inputs, ctx)
54}
55
56/// Encode a single slot value to its wire bytes. Surfaces encode
57/// failures as `OpError::ExecutionFailed` carrying the slot name
58/// and the underlying error, so a malformed payload drops the op
59/// instead of crashing the node.
60fn encode_or_error(name: &str, value: &dyn SlotValue) -> Result<Vec<u8>, OpError> {
61 value.to_wire_bytes().map_err(|e| OpError {
62 kind: bb_runtime::bus::OpErrorKind::ExecutionFailed,
63 reason: "wire_encode_failed",
64 detail: format!("wire encode of slot `{name}` failed: {e}"),
65 })
66}
67
68/// `Recv` dispatch entry point matching `StatelessInvokeFn`. Pure
69/// structural placeholder; downstream consumers are pushed onto the
70/// frontier when inbound data-plane delivery seeds the slot, not by
71/// this dispatch.
72pub fn invoke_recv(
73 _node: &NodeProto,
74 _inputs: &[(&str, &dyn SlotValue)],
75 _ctx: &mut RuntimeResourceRef<'_>,
76) -> Result<DispatchResult, OpError> {
77 Ok(DispatchResult::Immediate(vec![]))
78}
79
80/// Construct an envelope with the resolved address list, one or
81/// more fills and the given correlation. `dest_peer_addresses` is
82/// the resolved snapshot of `AddressBook::lookup(peer)` at dispatch
83/// time per `docs/ADDRESSING.md`. The host's transport adapter
84/// picks one of these entries based on its networking capabilities.
85/// Each fill carries its own per-slot multiaddr `dest_suffix` for
86/// *intra-node* slot routing only.
87fn build_envelope(
88 dest_peer_addresses: Vec<Vec<u8>>,
89 fills: Vec<SlotFill>,
90 correlation: WireCorrelation,
91) -> WireEnvelope {
92 WireEnvelope {
93 dest_peer_addresses,
94 fills,
95 correlation: Some(correlation),
96 remaining_deadline_ns: 0,
97 edge_rtt_reports: Vec::new(),
98 ..Default::default()
99 }
100}
101
102pub(crate) fn invoke_send(
103 inputs: &[(&str, &dyn SlotValue)],
104 ctx: &mut RuntimeResourceRef<'_>,
105) -> Result<DispatchResult, OpError> {
106 let dest_peers = extract_dest_peers(inputs, ctx);
107 let fills = collect_fills(inputs, ctx)?;
108
109 // Decide ORIGINATOR vs FORWARDER role:
110 // - Forwarder: this Send fires inside an inbound-delivery cascade
111 // AND the inbound envelope carried a non-zero correlation
112 // wire_req_id. Reuse that token; skip register_in_flight.
113 // - Originator: read `chain_id` from the Send NodeProto's
114 // metadata (stamped by the compiler's `analyze_round_trips`
115 // pass) and use it as the wire_req_id; if absent, mint fresh.
116 // Register the in-flight entry so the eventual response back
117 // to this site is observable.
118 let (req_id_u64, is_forwarder) = match ctx.current.inbound.wire_req_id {
119 Some(inbound) if inbound != 0 => (inbound, true),
120 _ => {
121 // Originator path.
122 let chain_id =
123 read_metadata_u64(ctx.current.node_metadata, "ai.bytesandbrains.wire.chain_id");
124 let token = chain_id.unwrap_or_else(|| ctx.net.requests.mint_token().as_u64());
125 (token, false)
126 }
127 };
128
129 // Derive Dapper-style outbound `remaining_deadline_ns`. When
130 // forwarding inside a cascade, subtract elapsed local time from
131 // the inbound's remaining budget. Otherwise read the static
132 // `deadline_ns` attribute stamped by `derive_wire_deadlines`.
133 let chain_ctx = ctx.read_chain_context();
134 let mut outbound_deadline_ns: u64 = 0;
135 if let (Some(inbound_remaining), Some(arrival_ns)) = (
136 ctx.current.inbound.remaining_deadline_ns,
137 ctx.current.inbound.arrival_ns,
138 ) {
139 let now_ns = ctx.time.scheduler.now_ns();
140 let elapsed = now_ns.saturating_sub(arrival_ns);
141 outbound_deadline_ns = inbound_remaining.saturating_sub(elapsed);
142 } else if let Some(static_deadline_ns) =
143 read_attribute_u64(ctx.current.node_attributes, "deadline_ns")
144 {
145 outbound_deadline_ns = static_deadline_ns;
146 } else if let Some(first_peer) = dest_peers.first().copied() {
147 // Last resort: adaptive RTT-tracker estimate, indexed by the
148 // first peer's site (representative for fan-out).
149 outbound_deadline_ns = ctx.estimate_wire_budget_ns(
150 peer_to_site(first_peer),
151 chain_ctx,
152 bb_ir::syscall_ids::DEFAULT_PER_HOP_BUDGET_NS,
153 );
154 }
155
156 if let Some(first_peer) = dest_peers.first().copied() {
157 if !is_forwarder {
158 // Originator registers ONE in-flight entry per Send call
159 // (not per fan-out peer) — the `wire_req_id` is shared
160 // across all envelopes in the fan-out, and the first
161 // response satisfies the request. Forwarders skip;
162 // the upstream originator's entry tracks the chain.
163 //
164 // `ttl_ns` is `NonZeroU64`; if the deadline
165 // budget collapsed to 0 (a degenerate but possible
166 // outcome of the deadline cascade above), substitute the
167 // minimum 1 ns so the in-flight entry still expires
168 // rather than living forever.
169 let target_site = peer_to_site(first_peer);
170 let ttl = std::num::NonZeroU64::new(outbound_deadline_ns)
171 .unwrap_or(unsafe { std::num::NonZeroU64::new_unchecked(1) });
172 ctx.net.requests.register_in_flight(
173 req_id_u64,
174 ctx.time.scheduler.now_ns(),
175 target_site,
176 chain_ctx,
177 ttl,
178 None,
179 );
180 }
181 }
182
183 // Empty peer list → no envelopes (composes naturally with
184 // samplers that returned no peers this round).
185 let kind = if req_id_u64 != 0 {
186 bb_runtime::envelope::CorrelationKind::Request as i32
187 } else {
188 bb_runtime::envelope::CorrelationKind::None as i32
189 };
190 // Snapshot the sender's local-address list once per Send call.
191 // The receiver merges this into its AddressBook entry for the
192 // sender so future replies can dial back on any reachable
193 // interface; an empty Vec stamps zero entries and the receiver
194 // leaves its existing entry untouched.
195 let src_peer_addresses: Vec<Vec<u8>> =
196 ctx.local_addresses().iter().map(|a| a.to_bytes()).collect();
197
198 for peer in &dest_peers {
199 // Resolve the peer's address list. Per-peer failure surfaces
200 // as PeerResolveFailed but does NOT abort the fan-out — other
201 // destinations may still receive.
202 let resolved: Option<Vec<Vec<u8>>> = ctx
203 .peers
204 .addresses
205 .lookup(*peer)
206 .filter(|s| !s.is_empty())
207 .map(|s| s.iter().map(|a| a.to_bytes()).collect());
208 match resolved {
209 Some(dest_peer_addresses) => {
210 let mut env = build_envelope(
211 dest_peer_addresses,
212 fills.clone(),
213 WireCorrelation {
214 kind,
215 wire_req_id: req_id_u64,
216 },
217 );
218 env.remaining_deadline_ns = outbound_deadline_ns;
219 env.src_peer_addresses = src_peer_addresses.clone();
220 ctx.net.outbound.push(env);
221 }
222 None => {
223 ctx.net
224 .pending_peer_resolve_failures
225 .push((Some(*peer), ctx.current.op_ref));
226 ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
227 bb_runtime::bus::InfraEvent::PeerResolveFailure {
228 peer: Some(*peer),
229 op_ref: ctx.current.op_ref,
230 },
231 ));
232 }
233 }
234 }
235 // Two outputs by position: (data, handle).
236 // - `data` is a structural placeholder on the producer side; the
237 // downstream Recv produces the actual receiver-side payload.
238 // A TriggerValue keeps the slot occupied so any accidental
239 // local read sees a non-empty signal.
240 // - `handle` carries the wire_req_id so downstream local ops can
241 // thread completion / timeout tracking even when the envelope
242 // never shipped (empty peer list, all unresolvable, etc.).
243 Ok(DispatchResult::Immediate(vec![
244 (
245 "data".to_string(),
246 Box::new(bb_runtime::syscall::values::TriggerValue) as Box<dyn SlotValue>,
247 ),
248 (
249 "handle".to_string(),
250 Box::new(WireReqIdValue(req_id_u64)) as Box<dyn SlotValue>,
251 ),
252 ]))
253}
254
255/// Look up a metadata_props entry by key, parsing the value as u64.
256fn read_metadata_u64(
257 props: &[bb_ir::proto::onnx::StringStringEntryProto],
258 key: &str,
259) -> Option<u64> {
260 props
261 .iter()
262 .find(|p| p.key == key)
263 .and_then(|p| p.value.parse().ok())
264}
265
266/// Look up an attribute by name, returning its i64 field as u64.
267fn read_attribute_u64(attrs: &[bb_ir::proto::onnx::AttributeProto], name: &str) -> Option<u64> {
268 attrs
269 .iter()
270 .find(|a| a.name == name)
271 .map(|a| a.i.max(0) as u64)
272}
273
274/// Map a `PeerId` onto a `NodeSiteId` for RTT tracker indexing.
275/// Hashes the full multihash digest with FNV-1a so two peers whose
276/// multihashes share a leading 8-byte prefix don't collide into the
277/// same RTT slot. Production deployments that distinguish multiple
278/// logical sites per physical peer (e.g. fast ping handler vs GPU
279/// compute handler on the same address) can swap this for an
280/// explicit `NodeSiteId` carried in the envelope.
281fn peer_to_site(peer: bb_runtime::ids::PeerId) -> bb_runtime::ids::NodeSiteId {
282 bb_runtime::ids::NodeSiteId::from(bb_runtime::slot_value::fnv1a_64(peer.digest()))
283}
284
285/// Pull the destination `PeerId` from the `dest`/`peer` input.
286/// The wire syscall then resolves the PeerId to its multi-address
287/// list via the framework's `AddressBook` and packs the list into
288/// `WireEnvelope.dest_peer_addresses`. On lookup miss (peer unknown
289/// or its address list is empty), the wire syscall emits
290/// `EngineStep::PeerResolveFailed` + `InfraEvent::PeerResolveFailure`
291/// per `docs/ADDRESSING.md` instead of shipping an envelope.
292///
293/// Pull the destination peer list from a Send op's inputs.
294///
295/// Vec-only: the position-1 input must resolve to a
296/// `PeerIdVecValue`. Samplers + broadcast components always emit
297/// a vector; a single-peer send wraps `[peer]` itself. Returns
298/// an empty `Vec` when no peer input resolves — composes
299/// naturally with samplers that returned no peers this round (the
300/// wire fires no envelopes; the local DAG continues).
301pub(crate) fn extract_dest_peers(
302 inputs: &[(&str, &dyn SlotValue)],
303 _ctx: &RuntimeResourceRef<'_>,
304) -> Vec<bb_runtime::ids::PeerId> {
305 // Closure: decode one input value into a peer vector. The
306 // `BytesValue` fallback handles `IngressEvent::Invoke` /
307 // `AppEvent` hosts that ship pre-encoded peer-vec bytes — the
308 // engine wraps those into a `BytesValue` carrier without per-
309 // slot `type_hash` plumbing on those ingress paths.
310 let try_decode = |h: &dyn SlotValue| -> Option<Vec<bb_runtime::ids::PeerId>> {
311 if let Some(p) = h.as_any().downcast_ref::<PeerIdVecValue>() {
312 return Some(p.0.clone());
313 }
314 if let Some(b) = h.as_any().downcast_ref::<BytesValue>() {
315 if let Ok(p) = bincode::deserialize::<PeerIdVecValue>(&b.0) {
316 return Some(p.0);
317 }
318 }
319 None
320 };
321
322 // 1. Named match — back-compat for hand-authored Sends.
323 for (name, h) in inputs {
324 if matches!(*name, "dest" | "dest_peer" | "peer" | "peers" | "peer_id") {
325 if let Some(v) = try_decode(*h) {
326 return v;
327 }
328 }
329 }
330 // 2. Position-based fallback — the DSL `g.net_out(name, peers, data)`
331 // emits `Send(input[0]=data, input[1]=peers)` regardless of
332 // the peer source's value name (e.g. `trainer_peers`,
333 // `agg_peers`, or a sampler's auto-minted output name).
334 if let Some((_, h)) = inputs.get(1) {
335 if let Some(v) = try_decode(*h) {
336 return v;
337 }
338 }
339 Vec::new()
340}
341
342/// Attribute prefix the compiler's `analyze_wire_edges` pass stamps
343/// on each producer Send NodeProto. The value is the canonical
344/// `Address` byte encoding of the destination multiaddr suffix
345/// (e.g. `/site/<NodeSiteId>` or `/component/<cref>/op/<name>`).
346const DEST_SUFFIX_ATTR_PREFIX: &str = "ai.bytesandbrains.dest_suffix.";
347
348/// Attribute prefix the compiler's `analyze_wire_edges` pass stamps
349/// when it has classified a per-input edge as trigger-only (the
350/// downstream Recv consumer reads only the firing signal, not the
351/// payload bytes). Takes priority over the `payload.is_empty()`
352/// heuristic — a compiler-attested classification is authoritative
353/// even when the value happens to encode to zero bytes.
354const TRIGGER_ONLY_ATTR_PREFIX: &str = "ai.bytesandbrains.trigger_only.";
355
356/// Collect every value input (anything not `dest*` or `req_id`) into
357/// a `SlotFill`. Each fill's `dest_suffix` is resolved in priority
358/// order:
359/// 1. Per-input attribute on the Send NodeProto (stamped by the
360/// compiler's `analyze_wire_edges` pass).
361/// 2. Companion `<name>_suffix` input (escape hatch for hosts that
362/// construct envelopes outside the compilation pipeline - e.g.
363/// sim harnesses).
364/// 3. Empty suffix (envelope receiver drops the fill silently).
365///
366/// `SlotFill.type_hash` is stamped from the producer-side
367/// `SlotValue::type_hash()` so the receiver dispatches by type
368/// instead of seeing the always-0 default.
369/// `SlotFill.trigger_only` resolves first to the compiler-stamped
370/// per-input attribute (`ai.bytesandbrains.trigger_only.<name>`),
371/// then falls back to the `payload.is_empty()` heuristic.
372fn collect_fills(
373 inputs: &[(&str, &dyn SlotValue)],
374 ctx: &RuntimeResourceRef<'_>,
375) -> Result<Vec<SlotFill>, OpError> {
376 // Index attribute-stamped suffixes by base input name.
377 let mut attr_suffixes: HashMap<String, Vec<u8>> = HashMap::new();
378 // Index compiler-stamped trigger-only classifications by base
379 // input name. Attribute value is a non-empty byte string for
380 // "trigger_only" / empty (or absent) for "data" / payload.
381 let mut attr_trigger_only: HashMap<String, bool> = HashMap::new();
382 for attr in ctx.current.node_attributes {
383 if let Some(base) = attr.name.strip_prefix(DEST_SUFFIX_ATTR_PREFIX) {
384 attr_suffixes.insert(base.to_string(), attr.s.clone());
385 }
386 if let Some(base) = attr.name.strip_prefix(TRIGGER_ONLY_ATTR_PREFIX) {
387 attr_trigger_only.insert(base.to_string(), !attr.s.is_empty() || attr.i != 0);
388 }
389 }
390
391 // Index companion-input suffixes for the fallback path. The
392 // companion input carries multiaddr bytes (typically an
393 // `AddressValue` or `BytesValue`); `to_wire_bytes` returns the
394 // raw encoded suffix. Encode failures surface as
395 // `OpError::ExecutionFailed` with the slot name in `detail` so
396 // the engine can drop the op gracefully.
397 let mut input_suffixes: HashMap<&str, Vec<u8>> = HashMap::new();
398 for (name, h) in inputs {
399 if let Some(base) = name.strip_suffix("_suffix") {
400 input_suffixes.insert(base, encode_or_error(name, *h)?);
401 }
402 }
403
404 let mut fills: Vec<SlotFill> = Vec::new();
405 for (name, h) in inputs {
406 if name.ends_with("_suffix")
407 || matches!(
408 *name,
409 "dest" | "dest_peer" | "peer" | "peers" | "peer_id" | "req_id"
410 )
411 {
412 continue;
413 }
414 let payload = encode_or_error(name, *h)?;
415 let dest_suffix = attr_suffixes
416 .get(*name)
417 .cloned()
418 .or_else(|| input_suffixes.get(*name).cloned())
419 .unwrap_or_default();
420 let trigger_only = attr_trigger_only
421 .get(*name)
422 .copied()
423 .unwrap_or(payload.is_empty());
424 let type_hash = h.type_hash();
425 fills.push(SlotFill {
426 dest_suffix,
427 payload,
428 trigger_only,
429 type_hash,
430 });
431 }
432 Ok(fills)
433}
434
435
436use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
437
438inventory::submit! {
439 _BbOpsSyscallReg {
440 domain: WIRE_DOMAIN,
441 op_type: "Send",
442 invoke,
443 kind: bb_runtime::registry::RegistrationKind::Syscall,
444 }
445}
446
447inventory::submit! {
448 _BbOpsSyscallReg {
449 domain: WIRE_DOMAIN,
450 op_type: "Recv",
451 invoke: invoke_recv,
452 kind: bb_runtime::registry::RegistrationKind::Syscall,
453 }
454}