Skip to main content

bb_ops/network/wire/
mod.rs

1//! `ai.bytesandbrains.wire v1` opset - engine-native.
2//!
3//! Two ops, exactly one of which the user authors:
4//!
5//! - `Send` (user-facing): takes a payload + a typed `peer` input
6//!   carrying a [`PeerId`](bb_runtime::ids::PeerId); resolves the peer to
7//!   a multiaddr through `ctx.peers.addresses`, builds one `SlotFill`
8//!   per non-peer input from the compiler-stamped `dest_suffix`
9//!   metadata, ships the envelope, and returns `(data, handle)` -
10//!   a structural-placeholder `Trigger` plus a freshly minted
11//!   `wire_req_id`.
12//!
13//! - `Recv` (framework-synthesized): emitted on receiver partitions
14//!   by [bb_compiler::synthesize_wire_recvs()]. Pure structural
15//!   placeholder so the receiver's `FunctionProto` stays a closed
16//!   DAG and the consumer's input value name resolves to a
17//!   `NodeSiteId` the inbound `deliver_fill` writes into. The
18//!   dispatch handler returns no outputs - Recv never lands on the
19//!   frontier in normal flow.
20//!
21//! Both ops register through the same path as every other syscall
22//! (`Engine::register_syscall`). There is no separate `WireRuntime`
23//! binding - wire is engine infrastructure, not a user role.
24
25use std::collections::HashMap;
26
27use bb_ir::proto::onnx::NodeProto;
28use bb_runtime::atomic::DispatchResult;
29use bb_runtime::bus::OpError;
30use bb_runtime::envelope::{SlotFill, WireCorrelation, WireEnvelope};
31use bb_runtime::runtime::RuntimeResourceRef;
32use bb_runtime::slot_value::SlotValue;
33use bb_runtime::syscall::values::{BytesValue, PeerIdVecValue, WireReqIdValue};
34
35/// Wire opset domain.
36pub const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
37
38/// Wire opset version.
39pub const WIRE_VERSION: i64 = 1;
40
41/// Marker struct for `register_syscall::<SendOp>`.
42pub struct SendOp;
43
44/// Marker struct for `register_syscall::<RecvOp>`.
45pub struct RecvOp;
46
47/// `Send` dispatch entry point matching `StatelessInvokeFn`.
48pub fn invoke(
49    _node: &NodeProto,
50    inputs: &[(&str, &dyn SlotValue)],
51    ctx: &mut RuntimeResourceRef<'_>,
52) -> Result<DispatchResult, OpError> {
53    invoke_send(inputs, ctx)
54}
55
56/// Encode a single slot value to its wire bytes. Surfaces encode
57/// failures as `OpError::ExecutionFailed` carrying the slot name
58/// and the underlying error, so a malformed payload drops the op
59/// instead of crashing the node.
60fn encode_or_error(name: &str, value: &dyn SlotValue) -> Result<Vec<u8>, OpError> {
61    value.to_wire_bytes().map_err(|e| OpError {
62        kind: bb_runtime::bus::OpErrorKind::ExecutionFailed,
63        reason: "wire_encode_failed",
64        detail: format!("wire encode of slot `{name}` failed: {e}"),
65    })
66}
67
68/// `Recv` dispatch entry point matching `StatelessInvokeFn`. Pure
69/// structural placeholder; downstream consumers are pushed onto the
70/// frontier when inbound data-plane delivery seeds the slot, not by
71/// this dispatch.
72pub fn invoke_recv(
73    _node: &NodeProto,
74    _inputs: &[(&str, &dyn SlotValue)],
75    _ctx: &mut RuntimeResourceRef<'_>,
76) -> Result<DispatchResult, OpError> {
77    Ok(DispatchResult::Immediate(vec![]))
78}
79
80/// Construct an envelope with the resolved address list, one or
81/// more fills and the given correlation. `dest_peer_addresses` is
82/// the resolved snapshot of `AddressBook::lookup(peer)` at dispatch
83/// time per `docs/ADDRESSING.md`. The host's transport adapter
84/// picks one of these entries based on its networking capabilities.
85/// Each fill carries its own per-slot multiaddr `dest_suffix` for
86/// *intra-node* slot routing only.
87fn build_envelope(
88    dest_peer_addresses: Vec<Vec<u8>>,
89    fills: Vec<SlotFill>,
90    correlation: WireCorrelation,
91) -> WireEnvelope {
92    WireEnvelope {
93        dest_peer_addresses,
94        fills,
95        correlation: Some(correlation),
96        remaining_deadline_ns: 0,
97        edge_rtt_reports: Vec::new(),
98        ..Default::default()
99    }
100}
101
102pub(crate) fn invoke_send(
103    inputs: &[(&str, &dyn SlotValue)],
104    ctx: &mut RuntimeResourceRef<'_>,
105) -> Result<DispatchResult, OpError> {
106    let dest_peers = extract_dest_peers(inputs, ctx);
107    let fills = collect_fills(inputs, ctx)?;
108
109    // Decide ORIGINATOR vs FORWARDER role:
110    // - Forwarder: this Send fires inside an inbound-delivery cascade
111    //   AND the inbound envelope carried a non-zero correlation
112    //   wire_req_id. Reuse that token; skip register_in_flight.
113    // - Originator: read `chain_id` from the Send NodeProto's
114    //   metadata (stamped by the compiler's `analyze_round_trips`
115    //   pass) and use it as the wire_req_id; if absent, mint fresh.
116    //   Register the in-flight entry so the eventual response back
117    //   to this site is observable.
118    let (req_id_u64, is_forwarder) = match ctx.current.inbound.wire_req_id {
119        Some(inbound) if inbound != 0 => (inbound, true),
120        _ => {
121            // Originator path.
122            let chain_id =
123                read_metadata_u64(ctx.current.node_metadata, "ai.bytesandbrains.wire.chain_id");
124            let token = chain_id.unwrap_or_else(|| ctx.net.requests.mint_token().as_u64());
125            (token, false)
126        }
127    };
128
129    // Derive Dapper-style outbound `remaining_deadline_ns`. When
130    // forwarding inside a cascade, subtract elapsed local time from
131    // the inbound's remaining budget. Otherwise read the static
132    // `deadline_ns` attribute stamped by `derive_wire_deadlines`.
133    let chain_ctx = ctx.read_chain_context();
134    let mut outbound_deadline_ns: u64 = 0;
135    if let (Some(inbound_remaining), Some(arrival_ns)) = (
136        ctx.current.inbound.remaining_deadline_ns,
137        ctx.current.inbound.arrival_ns,
138    ) {
139        let now_ns = ctx.time.scheduler.now_ns();
140        let elapsed = now_ns.saturating_sub(arrival_ns);
141        outbound_deadline_ns = inbound_remaining.saturating_sub(elapsed);
142    } else if let Some(static_deadline_ns) =
143        read_attribute_u64(ctx.current.node_attributes, "deadline_ns")
144    {
145        outbound_deadline_ns = static_deadline_ns;
146    } else if let Some(first_peer) = dest_peers.first().copied() {
147        // Last resort: adaptive RTT-tracker estimate, indexed by the
148        // first peer's site (representative for fan-out).
149        outbound_deadline_ns = ctx.estimate_wire_budget_ns(
150            peer_to_site(first_peer),
151            chain_ctx,
152            bb_ir::syscall_ids::DEFAULT_PER_HOP_BUDGET_NS,
153        );
154    }
155
156    if let Some(first_peer) = dest_peers.first().copied() {
157        if !is_forwarder {
158            // Originator registers ONE in-flight entry per Send call
159            // (not per fan-out peer) — the `wire_req_id` is shared
160            // across all envelopes in the fan-out, and the first
161            // response satisfies the request. Forwarders skip;
162            // the upstream originator's entry tracks the chain.
163            //
164            // `ttl_ns` is `NonZeroU64`; if the deadline
165            // budget collapsed to 0 (a degenerate but possible
166            // outcome of the deadline cascade above), substitute the
167            // minimum 1 ns so the in-flight entry still expires
168            // rather than living forever.
169            let target_site = peer_to_site(first_peer);
170            let ttl = std::num::NonZeroU64::new(outbound_deadline_ns)
171                .unwrap_or(unsafe { std::num::NonZeroU64::new_unchecked(1) });
172            ctx.net.requests.register_in_flight(
173                req_id_u64,
174                ctx.time.scheduler.now_ns(),
175                target_site,
176                chain_ctx,
177                ttl,
178                None,
179            );
180        }
181    }
182
183    // Empty peer list → no envelopes (composes naturally with
184    // samplers that returned no peers this round).
185    let kind = if req_id_u64 != 0 {
186        bb_runtime::envelope::CorrelationKind::Request as i32
187    } else {
188        bb_runtime::envelope::CorrelationKind::None as i32
189    };
190    // Snapshot the sender's local-address list once per Send call.
191    // The receiver merges this into its AddressBook entry for the
192    // sender so future replies can dial back on any reachable
193    // interface; an empty Vec stamps zero entries and the receiver
194    // leaves its existing entry untouched.
195    let src_peer_addresses: Vec<Vec<u8>> =
196        ctx.local_addresses().iter().map(|a| a.to_bytes()).collect();
197
198    for peer in &dest_peers {
199        // Resolve the peer's address list. Per-peer failure surfaces
200        // as PeerResolveFailed but does NOT abort the fan-out — other
201        // destinations may still receive.
202        let resolved: Option<Vec<Vec<u8>>> = ctx
203            .peers
204            .addresses
205            .lookup(*peer)
206            .filter(|s| !s.is_empty())
207            .map(|s| s.iter().map(|a| a.to_bytes()).collect());
208        match resolved {
209            Some(dest_peer_addresses) => {
210                let mut env = build_envelope(
211                    dest_peer_addresses,
212                    fills.clone(),
213                    WireCorrelation {
214                        kind,
215                        wire_req_id: req_id_u64,
216                    },
217                );
218                env.remaining_deadline_ns = outbound_deadline_ns;
219                env.src_peer_addresses = src_peer_addresses.clone();
220                ctx.net.outbound.push(env);
221            }
222            None => {
223                ctx.net
224                    .pending_peer_resolve_failures
225                    .push((Some(*peer), ctx.current.op_ref));
226                ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
227                    bb_runtime::bus::InfraEvent::PeerResolveFailure {
228                        peer: Some(*peer),
229                        op_ref: ctx.current.op_ref,
230                    },
231                ));
232            }
233        }
234    }
235    // Two outputs by position: (data, handle).
236    // - `data` is a structural placeholder on the producer side; the
237    //   downstream Recv produces the actual receiver-side payload.
238    //   A TriggerValue keeps the slot occupied so any accidental
239    //   local read sees a non-empty signal.
240    // - `handle` carries the wire_req_id so downstream local ops can
241    //   thread completion / timeout tracking even when the envelope
242    //   never shipped (empty peer list, all unresolvable, etc.).
243    Ok(DispatchResult::Immediate(vec![
244        (
245            "data".to_string(),
246            Box::new(bb_runtime::syscall::values::TriggerValue) as Box<dyn SlotValue>,
247        ),
248        (
249            "handle".to_string(),
250            Box::new(WireReqIdValue(req_id_u64)) as Box<dyn SlotValue>,
251        ),
252    ]))
253}
254
255/// Look up a metadata_props entry by key, parsing the value as u64.
256fn read_metadata_u64(
257    props: &[bb_ir::proto::onnx::StringStringEntryProto],
258    key: &str,
259) -> Option<u64> {
260    props
261        .iter()
262        .find(|p| p.key == key)
263        .and_then(|p| p.value.parse().ok())
264}
265
266/// Look up an attribute by name, returning its i64 field as u64.
267fn read_attribute_u64(attrs: &[bb_ir::proto::onnx::AttributeProto], name: &str) -> Option<u64> {
268    attrs
269        .iter()
270        .find(|a| a.name == name)
271        .map(|a| a.i.max(0) as u64)
272}
273
274/// Map a `PeerId` onto a `NodeSiteId` for RTT tracker indexing.
275/// Hashes the full multihash digest with FNV-1a so two peers whose
276/// multihashes share a leading 8-byte prefix don't collide into the
277/// same RTT slot. Production deployments that distinguish multiple
278/// logical sites per physical peer (e.g. fast ping handler vs GPU
279/// compute handler on the same address) can swap this for an
280/// explicit `NodeSiteId` carried in the envelope.
281fn peer_to_site(peer: bb_runtime::ids::PeerId) -> bb_runtime::ids::NodeSiteId {
282    bb_runtime::ids::NodeSiteId::from(bb_runtime::slot_value::fnv1a_64(peer.digest()))
283}
284
285/// Pull the destination `PeerId` from the `dest`/`peer` input.
286/// The wire syscall then resolves the PeerId to its multi-address
287/// list via the framework's `AddressBook` and packs the list into
288/// `WireEnvelope.dest_peer_addresses`. On lookup miss (peer unknown
289/// or its address list is empty), the wire syscall emits
290/// `EngineStep::PeerResolveFailed` + `InfraEvent::PeerResolveFailure`
291/// per `docs/ADDRESSING.md` instead of shipping an envelope.
292///
293/// Pull the destination peer list from a Send op's inputs.
294///
295/// Vec-only: the position-1 input must resolve to a
296/// `PeerIdVecValue`. Samplers + broadcast components always emit
297/// a vector; a single-peer send wraps `[peer]` itself. Returns
298/// an empty `Vec` when no peer input resolves — composes
299/// naturally with samplers that returned no peers this round (the
300/// wire fires no envelopes; the local DAG continues).
301pub(crate) fn extract_dest_peers(
302    inputs: &[(&str, &dyn SlotValue)],
303    _ctx: &RuntimeResourceRef<'_>,
304) -> Vec<bb_runtime::ids::PeerId> {
305    // Closure: decode one input value into a peer vector. The
306    // `BytesValue` fallback handles `IngressEvent::Invoke` /
307    // `AppEvent` hosts that ship pre-encoded peer-vec bytes — the
308    // engine wraps those into a `BytesValue` carrier without per-
309    // slot `type_hash` plumbing on those ingress paths.
310    let try_decode = |h: &dyn SlotValue| -> Option<Vec<bb_runtime::ids::PeerId>> {
311        if let Some(p) = h.as_any().downcast_ref::<PeerIdVecValue>() {
312            return Some(p.0.clone());
313        }
314        if let Some(b) = h.as_any().downcast_ref::<BytesValue>() {
315            if let Ok(p) = bincode::deserialize::<PeerIdVecValue>(&b.0) {
316                return Some(p.0);
317            }
318        }
319        None
320    };
321
322    // 1. Named match — back-compat for hand-authored Sends.
323    for (name, h) in inputs {
324        if matches!(*name, "dest" | "dest_peer" | "peer" | "peers" | "peer_id") {
325            if let Some(v) = try_decode(*h) {
326                return v;
327            }
328        }
329    }
330    // 2. Position-based fallback — the DSL `g.net_out(name, peers, data)`
331    //    emits `Send(input[0]=data, input[1]=peers)` regardless of
332    //    the peer source's value name (e.g. `trainer_peers`,
333    //    `agg_peers`, or a sampler's auto-minted output name).
334    if let Some((_, h)) = inputs.get(1) {
335        if let Some(v) = try_decode(*h) {
336            return v;
337        }
338    }
339    Vec::new()
340}
341
342/// Attribute prefix the compiler's `analyze_wire_edges` pass stamps
343/// on each producer Send NodeProto. The value is the canonical
344/// `Address` byte encoding of the destination multiaddr suffix
345/// (e.g. `/site/<NodeSiteId>` or `/component/<cref>/op/<name>`).
346const DEST_SUFFIX_ATTR_PREFIX: &str = "ai.bytesandbrains.dest_suffix.";
347
348/// Attribute prefix the compiler's `analyze_wire_edges` pass stamps
349/// when it has classified a per-input edge as trigger-only (the
350/// downstream Recv consumer reads only the firing signal, not the
351/// payload bytes). Takes priority over the `payload.is_empty()`
352/// heuristic — a compiler-attested classification is authoritative
353/// even when the value happens to encode to zero bytes.
354const TRIGGER_ONLY_ATTR_PREFIX: &str = "ai.bytesandbrains.trigger_only.";
355
356/// Collect every value input (anything not `dest*` or `req_id`) into
357/// a `SlotFill`. Each fill's `dest_suffix` is resolved in priority
358/// order:
359///   1. Per-input attribute on the Send NodeProto (stamped by the
360///      compiler's `analyze_wire_edges` pass).
361///   2. Companion `<name>_suffix` input (escape hatch for hosts that
362///      construct envelopes outside the compilation pipeline - e.g.
363///      sim harnesses).
364///   3. Empty suffix (envelope receiver drops the fill silently).
365///
366/// `SlotFill.type_hash` is stamped from the producer-side
367/// `SlotValue::type_hash()` so the receiver dispatches by type
368/// instead of seeing the always-0 default.
369/// `SlotFill.trigger_only` resolves first to the compiler-stamped
370/// per-input attribute (`ai.bytesandbrains.trigger_only.<name>`),
371/// then falls back to the `payload.is_empty()` heuristic.
372fn collect_fills(
373    inputs: &[(&str, &dyn SlotValue)],
374    ctx: &RuntimeResourceRef<'_>,
375) -> Result<Vec<SlotFill>, OpError> {
376    // Index attribute-stamped suffixes by base input name.
377    let mut attr_suffixes: HashMap<String, Vec<u8>> = HashMap::new();
378    // Index compiler-stamped trigger-only classifications by base
379    // input name. Attribute value is a non-empty byte string for
380    // "trigger_only" / empty (or absent) for "data" / payload.
381    let mut attr_trigger_only: HashMap<String, bool> = HashMap::new();
382    for attr in ctx.current.node_attributes {
383        if let Some(base) = attr.name.strip_prefix(DEST_SUFFIX_ATTR_PREFIX) {
384            attr_suffixes.insert(base.to_string(), attr.s.clone());
385        }
386        if let Some(base) = attr.name.strip_prefix(TRIGGER_ONLY_ATTR_PREFIX) {
387            attr_trigger_only.insert(base.to_string(), !attr.s.is_empty() || attr.i != 0);
388        }
389    }
390
391    // Index companion-input suffixes for the fallback path. The
392    // companion input carries multiaddr bytes (typically an
393    // `AddressValue` or `BytesValue`); `to_wire_bytes` returns the
394    // raw encoded suffix. Encode failures surface as
395    // `OpError::ExecutionFailed` with the slot name in `detail` so
396    // the engine can drop the op gracefully.
397    let mut input_suffixes: HashMap<&str, Vec<u8>> = HashMap::new();
398    for (name, h) in inputs {
399        if let Some(base) = name.strip_suffix("_suffix") {
400            input_suffixes.insert(base, encode_or_error(name, *h)?);
401        }
402    }
403
404    let mut fills: Vec<SlotFill> = Vec::new();
405    for (name, h) in inputs {
406        if name.ends_with("_suffix")
407            || matches!(
408                *name,
409                "dest" | "dest_peer" | "peer" | "peers" | "peer_id" | "req_id"
410            )
411        {
412            continue;
413        }
414        let payload = encode_or_error(name, *h)?;
415        let dest_suffix = attr_suffixes
416            .get(*name)
417            .cloned()
418            .or_else(|| input_suffixes.get(*name).cloned())
419            .unwrap_or_default();
420        let trigger_only = attr_trigger_only
421            .get(*name)
422            .copied()
423            .unwrap_or(payload.is_empty());
424        let type_hash = h.type_hash();
425        fills.push(SlotFill {
426            dest_suffix,
427            payload,
428            trigger_only,
429            type_hash,
430        });
431    }
432    Ok(fills)
433}
434
435
436use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
437
438inventory::submit! {
439    _BbOpsSyscallReg {
440        domain: WIRE_DOMAIN,
441        op_type: "Send",
442        invoke,
443        kind: bb_runtime::registry::RegistrationKind::Syscall,
444    }
445}
446
447inventory::submit! {
448    _BbOpsSyscallReg {
449        domain: WIRE_DOMAIN,
450        op_type: "Recv",
451        invoke: invoke_recv,
452        kind: bb_runtime::registry::RegistrationKind::Syscall,
453    }
454}