bb_runtime/runtime.rs
1//! Runtime resource handle threaded into every `dispatch_atomic`
2//! call.
3//!
4//! The engine constructs a `RuntimeResourceRef` by split-borrowing
5//! the framework-primitive bundle + the bus before each
6//! `dispatch_atomic` call. Each field is a distinct `&mut`, so the
7//! borrow checker enforces field-level exclusivity — an Op may
8//! touch any subset.
9//!
10//! Async-completing impls call `ctx.complete_command(cmd_id,
11//! results)`; the engine drains `pending_completions` after the
12//! hook returns and routes them through `handle_completion`.
13
14use std::sync::Arc;
15
16use crate::bus::{AllocFailReason, AppIngressErrorKind, AppIngressSource, NodeEvent, TypedBus};
17use crate::completion::{CompletionHandle, CompletionSink};
18use crate::framework::rtt_tracker::{chain_id_from_targets, ChainContext};
19use crate::framework::{
20 rtt_tracker::RttTracker, AddressBook, BackoffTable, BackpressureTracker, EventSource,
21 HoldTable, InboundDedup, OutboundQueue, PeerGate, PeerGovernor, RecordBuffer, RequestTracker,
22 RngU64Source, Scheduler, SerializeQueue,
23};
24use crate::ids::{CommandId, OpRef};
25use crate::ingress::{IngressEvent, IngressQueue, COMPLETION_DETAIL_CAP};
26use crate::slot_value::SlotValue;
27
28impl CompletionSink for IngressQueue {
29 fn complete(&self, cmd_id: CommandId, result_bytes: &[u8]) {
30 // Per Principle 1a: `result_bytes` is borrowed from the
31 // caller's stack/transport buffer. Cap-check, fallibly
32 // reserve framework-owned storage, then copy. The owned
33 // `Vec<u8>` rides into the engine via the `Completion`
34 // variant; cap / alloc failures publish an `AppIngressError`
35 // sibling and drop the result (the parked op times out
36 // naturally — same surface as a missing completion).
37 let byte_count = result_bytes.len();
38 let cap = self.completion_result_cap();
39 if byte_count > cap {
40 let _ = self.push(IngressEvent::AppIngressError {
41 source: AppIngressSource::Completion { command: cmd_id },
42 byte_count,
43 kind: AppIngressErrorKind::PerItemCapExceeded { cap },
44 });
45 return;
46 }
47 let mut owned: Vec<u8> = Vec::new();
48 if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
49 let _ = self.push(IngressEvent::AppIngressError {
50 source: AppIngressSource::Completion { command: cmd_id },
51 byte_count,
52 kind: AppIngressErrorKind::AllocationFailed {
53 reason: AllocFailReason::HeapExhausted,
54 },
55 });
56 return;
57 }
58 owned.extend_from_slice(result_bytes);
59 let _ = self.push(IngressEvent::Completion {
60 cmd_id,
61 results: vec![owned],
62 });
63 }
64
65 fn fail(&self, cmd_id: CommandId, detail: &str) {
66 // Push the typed `CompletionFailed` variant so the parked
67 // op fails through `handle_completion_failed` → typed
68 // `OpFailed`, not via success-bytes masquerading as a
69 // completion.
70 //
71 // The detail string is truncated rather than rejected at
72 // `COMPLETION_DETAIL_CAP`: a `Display`-rendered failure must
73 // always land so the component sees a real failure instead
74 // of a missing completion masquerading as a timeout.
75 let truncated = if detail.len() > COMPLETION_DETAIL_CAP {
76 let mut end = COMPLETION_DETAIL_CAP;
77 while end > 0 && !detail.is_char_boundary(end) {
78 end -= 1;
79 }
80 &detail[..end]
81 } else {
82 detail
83 };
84 let owned: String = truncated.to_string();
85 let _ = self.push(IngressEvent::CompletionFailed {
86 cmd_id,
87 detail: owned,
88 });
89 }
90}
91
92/// Per-peer state borrowed mutably during dispatch.
93pub struct PeerCtx<'a> {
94 /// Per-peer concurrency limiter.
95 pub gate: &'a mut PeerGate,
96 /// Per-peer exponential backoff state.
97 pub backoff: &'a mut BackoffTable,
98 /// Peer policy + health source-of-truth.
99 pub governor: &'a mut PeerGovernor,
100 /// `PeerId → (Vec<Address>, ref_count)` registry.
101 pub addresses: &'a mut AddressBook,
102 /// Receiver-side back-pressure tracker. RX gates + ingress
103 /// detection sites consult this to record overload and decide
104 /// between emitting a typed `BackoffNotice` envelope or silently
105 /// dropping.
106 pub backpressure: &'a mut BackpressureTracker,
107}
108
109/// Network/transport state borrowed mutably during dispatch.
110pub struct NetCtx<'a> {
111 /// FIFO of wire envelopes ready to ship on the next outbound drain.
112 pub outbound: &'a mut OutboundQueue,
113 /// Per-NodeSiteId adaptive RTT tracker.
114 pub rtt: &'a mut RttTracker,
115 /// In-flight wire-request → CommandId map.
116 pub requests: &'a mut RequestTracker,
117 /// Sliding-window seen-message tracker.
118 pub dedup: &'a mut InboundDedup,
119 /// Peer-resolution failures captured during this poll cycle.
120 pub pending_peer_resolve_failures: &'a mut Vec<(Option<crate::ids::PeerId>, crate::ids::OpRef)>,
121}
122
123/// Time / scheduling state borrowed mutably during dispatch.
124pub struct TimeCtx<'a> {
125 /// Sorted timer heap.
126 pub scheduler: &'a mut Scheduler,
127}
128
129/// Syscall-side state (storage, RNG, latches, app-event drain).
130pub struct SyscallCtx<'a> {
131 /// Named-FIFO map for `Serialize.Enqueue` / `Dequeue`.
132 pub serialize_queue: &'a mut SerializeQueue,
133 /// Named-slot value buffer for `Hold.Stash` / `Flush`.
134 pub hold_table: &'a mut HoldTable,
135 /// Per-name bounded ring buffer for `Record`.
136 pub record_buffer: &'a mut RecordBuffer,
137 /// Registered `EventKind → ComponentTag` subscriptions.
138 pub event_source: &'a mut EventSource,
139 /// Per-Node counters bumped by `IncrMetric`.
140 pub counters: &'a mut std::collections::HashMap<String, u64>,
141 /// Per-group first-arrival latch for the `Any` syscall.
142 pub any_fired_groups: &'a mut std::collections::HashSet<String>,
143 /// Per-`(OpRef, ExecId)` latch for the `DeadlineMatch` syscall.
144 pub deadline_match_fired: &'a mut std::collections::HashSet<(u64, u64)>,
145 /// `u64` RNG source for the `RngU64` syscall.
146 pub rng: &'a mut dyn RngU64Source,
147 /// App events pending emission on the next poll's outbound drain.
148 pub pending_app_events: &'a mut Vec<crate::bus::AppEvent>,
149}
150
151/// Inbound-envelope context captured at delivery time and threaded
152/// into every op dispatched as part of the cascade.
153#[derive(Clone, Copy, Debug, Default)]
154pub struct InboundCtx {
155 /// The `PeerId` of the inbound envelope's transport-reported
156 /// source. `None` outside the inbound delivery path.
157 pub src_peer: Option<crate::ids::PeerId>,
158 /// The inbound envelope's `wire_req_id` correlation token.
159 pub wire_req_id: Option<u64>,
160 /// Engine-clock timestamp the envelope arrived at.
161 pub arrival_ns: Option<u64>,
162 /// Remaining deadline budget propagated by the sender.
163 pub remaining_deadline_ns: Option<u64>,
164}
165
166/// State scoped to the currently-dispatching op (NodeProto-level
167/// metadata, the op's identity, completion drain, command-id mint).
168pub struct CurrentCallCtx<'a> {
169 /// The `OpRef` of the Op currently being dispatched.
170 pub op_ref: OpRef,
171 /// The `ExecId` this dispatch belongs to. Syscalls that latch
172 /// per-execution (`DeadlineMatch`, `Any`) key on
173 /// `(op_ref, exec_id)` so a fresh execution starts unlatched.
174 pub exec_id: crate::ids::ExecId,
175 /// The Node's own `PeerId`.
176 pub self_peer: crate::ids::PeerId,
177 /// Attributes of the NodeProto being dispatched.
178 pub node_attributes: &'a [bb_ir::proto::onnx::AttributeProto],
179 /// Metadata_props of the NodeProto being dispatched.
180 pub node_metadata: &'a [bb_ir::proto::onnx::StringStringEntryProto],
181 /// Inbound-envelope context (all four `inbound_*` fields).
182 pub inbound: InboundCtx,
183 /// Completions captured during this dispatch.
184 pub pending_completions: Vec<PendingCompletion>,
185 /// Engine's monotonic CommandId source.
186 pub next_command_id: &'a mut u64,
187}
188
189/// Engine-resource handle threaded into every `dispatch_atomic`
190/// call. Per `docs/ENGINE.md` §10. Fields are grouped by concern -
191/// `peers`/`net`/`time`/`syscall` carry the framework primitive
192/// references; `current` carries per-op state; `bus`/`ingress`
193/// stay top-level; `components` exposes the cross-component
194/// read-only surface.
195pub struct RuntimeResourceRef<'a> {
196 /// Per-peer state (gate, backoff, governor, addresses).
197 pub peers: PeerCtx<'a>,
198 /// Network/transport state (outbound queue, RTT, requests, dedup).
199 pub net: NetCtx<'a>,
200 /// Time / scheduling state.
201 pub time: TimeCtx<'a>,
202 /// Syscall-side state (storage, RNG, latches, app-event drain).
203 pub syscall: SyscallCtx<'a>,
204 /// The in-Node typed event bus.
205 pub bus: &'a mut TypedBus,
206 /// Shared handle to the Node's ingress queue.
207 pub ingress: Arc<IngressQueue>,
208 /// Read-only view onto sibling components registered on the Node.
209 pub components: ComponentsView<'a>,
210 /// State scoped to the currently-dispatching op.
211 pub current: CurrentCallCtx<'a>,
212}
213
214/// Read-only view onto sibling components registered on the Node.
215/// Constructed by the engine at dispatch time from
216/// `&engine.components` + `&engine.slots`. The Vec is indexed by
217/// `ComponentRef.as_u32() as usize`; the slot the currently-
218/// dispatching component lives in is `None` for the duration of
219/// the dispatch (take-and-restore in `invoke_atomic`), so callers
220/// can't accidentally re-enter themselves.
221#[derive(Default)]
222pub struct ComponentsView<'a> {
223 /// All registered components indexed by `ComponentRef.as_u32()`,
224 /// borrowed from `engine.components`. `None` outside engine
225 /// context (test setups bypassing the registry).
226 pub instances: Option<&'a [Option<Box<dyn crate::component::ErasedComponent>>]>,
227 /// Author-chosen-slot-name → `ComponentRef` map, borrowed from
228 /// `engine.slots`. THE canonical dependency-resolution surface.
229 /// `None` outside engine context.
230 pub slots: Option<&'a std::collections::HashMap<String, crate::ids::ComponentRef>>,
231}
232
233impl ComponentsView<'_> {
234 /// Look up the component bound at `slot_name`. The generic
235 /// dependency-resolution surface — Components reach their
236 /// declared dependencies through this accessor. Returns `None`
237 /// when no slot of that name is bound or when the view has no
238 /// engine context.
239 pub fn for_slot(&self, slot_name: &str) -> Option<&dyn crate::component::ErasedComponent> {
240 let slots = self.slots?;
241 let instances = self.instances?;
242 let cref = slots.get(slot_name)?;
243 let idx = cref.as_u32() as usize;
244 instances.get(idx)?.as_deref()
245 }
246
247 /// Look up the component bound at `slot_name` AND downcast it
248 /// to `&T`. The typed counterpart to [`Self::for_slot`].
249 /// Returns `None` when the slot is unbound OR when the bound
250 /// concrete is not a `T`.
251 ///
252 /// In production this is reached through
253 /// [`crate::runtime::RuntimeResourceRef::dependency`], which
254 /// wraps the lookup in [`DependencyError`] variants for typed
255 /// error reporting.
256 pub fn for_slot_as<T: 'static>(&self, slot_name: &str) -> Option<&T> {
257 let erased = self.for_slot(slot_name)?;
258 let any: &dyn std::any::Any = erased;
259 any.downcast_ref::<T>()
260 }
261}
262
263/// Errors surfaced by [`RuntimeResourceRef::dependency`].
264///
265/// In production the dependency is verified at compile time by
266/// `resolve_component_dependencies`, so a runtime miss represents
267/// a framework invariant breach (e.g. someone bypassed
268/// `Node::install` to register a custom slot mapping). The error
269/// is exposed instead of panicking so test fixtures + introspection
270/// tooling can probe the surface without aborting the process.
271#[derive(Debug)]
272pub enum DependencyError {
273 /// No component is bound at the requested slot.
274 NotBound {
275 /// The slot name the caller requested.
276 slot: String,
277 },
278 /// A component IS bound but downcasting to the requested
279 /// type failed - the slot holds a different concrete than
280 /// the caller expected.
281 TypeMismatch {
282 /// The slot name the caller requested.
283 slot: String,
284 /// `std::any::type_name` of the expected type.
285 expected: &'static str,
286 },
287}
288
289impl std::fmt::Display for DependencyError {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 match self {
292 Self::NotBound { slot } => write!(f, "no component bound at slot `{slot}`"),
293 Self::TypeMismatch { slot, expected } => {
294 write!(f, "component at slot `{slot}` is not a `{expected}`",)
295 }
296 }
297 }
298}
299
300impl std::error::Error for DependencyError {}
301
302impl RuntimeResourceRef<'_> {
303 /// Ordered local-address bag for this Node. Reads the
304 /// AddressBook entry keyed by `self.current.self_peer`; returns
305 /// an empty slice when no local addresses are registered.
306 /// Wire ops + identity-bearing protocol replies (Announce,
307 /// Handshake) stamp this onto their outbound envelopes so
308 /// receivers can dial back on every reachable interface.
309 pub fn local_addresses(&self) -> &[crate::framework::Address] {
310 self.peers
311 .addresses
312 .lookup(self.current.self_peer)
313 .unwrap_or(&[])
314 }
315
316 /// Typed accessor for an author-declared dependency. Resolves
317 /// `slot_name` against the engine's generic slot registry and
318 /// downcasts the bound `ErasedComponent` to `&T`.
319 ///
320 /// In production the resolution is guaranteed to succeed -
321 /// `resolve_component_dependencies` verifies at compile time
322 /// that every `#[depends(<role> = "<slot>")]` declaration
323 /// matches a bound concrete of the right role. A miss here is
324 /// either a test fixture bypassing the compiler pipeline or a
325 /// framework invariant breach.
326 ///
327 /// ```ignore
328 /// // Inside a Component's dispatch_atomic / Contract impl:
329 /// let backend = ctx
330 /// .dependency::<MyCpuBackend>("compute")
331 /// .expect("compiler verified");
332 /// let result = backend.matmul(&lhs, &rhs)?;
333 /// ```
334 pub fn dependency<T: 'static>(&self, slot_name: &str) -> Result<&T, DependencyError> {
335 if self.components.for_slot(slot_name).is_none() {
336 return Err(DependencyError::NotBound {
337 slot: slot_name.to_string(),
338 });
339 }
340 self.components
341 .for_slot_as::<T>(slot_name)
342 .ok_or_else(|| DependencyError::TypeMismatch {
343 slot: slot_name.to_string(),
344 expected: std::any::type_name::<T>(),
345 })
346 }
347
348 /// -ii - convenience helper
349 /// that walks the hierarchical fallback in
350 /// [`crate::framework::rtt_tracker::RttTracker`] to pick the
351 /// effective deadline for a wire round-trip.
352 ///
353 /// `chain_id` + `hop_index` come from the compiler-stamped
354 /// `chain_targets` / `chain_depth` metadata on the current
355 /// NodeProto (read from
356 /// [`Self::current_node_metadata`] via
357 /// [`Self::read_chain_context`]); pass `None` for control-plane
358 /// sends that have no chain context.
359 pub fn estimate_wire_budget_ns(
360 &self,
361 target: crate::ids::NodeSiteId,
362 chain: Option<crate::framework::rtt_tracker::ChainContext>,
363 static_default_ns: u64,
364 ) -> u64 {
365 self.net
366 .rtt
367 .estimate_budget_ns(target, chain, static_default_ns)
368 }
369
370 /// Read the compiler-stamped `chain_targets` + chain hop
371 /// (encoded in `chain_depth` metadata) off the current NodeProto
372 /// and convert them into an [`crate::framework::rtt_tracker::ChainContext`].
373 /// Returns `None` when no chain metadata is present (the Send is
374 /// a fire-and-forget escape hatch or a control-plane round-trip).
375 pub fn read_chain_context(&self) -> Option<crate::framework::rtt_tracker::ChainContext> {
376 let mut chain_targets: Option<&str> = None;
377 let mut hop_index: u8 = 0;
378 for prop in self.current.node_metadata {
379 match prop.key.as_str() {
380 "ai.bytesandbrains.wire.chain_targets" => {
381 chain_targets = Some(prop.value.as_str());
382 }
383 "ai.bytesandbrains.wire.chain_hop_index" => {
384 if let Ok(h) = prop.value.parse::<u8>() {
385 hop_index = h;
386 }
387 }
388 _ => {}
389 }
390 }
391 chain_targets.map(|targets| ChainContext {
392 chain_id: chain_id_from_targets(targets),
393 hop_index,
394 })
395 }
396
397 /// Record a wire round-trip sample into the RTT tracker. Called
398 /// on response landing (after the matching `WireResponseLanded`
399 /// event surfaces the elapsed time) so all the hierarchical-
400 /// fallback EMA tiers stay current.
401 pub fn observe_wire_round_trip(
402 &mut self,
403 target: crate::ids::NodeSiteId,
404 chain: Option<crate::framework::rtt_tracker::ChainContext>,
405 elapsed_ns: u64,
406 now_ns: u64,
407 ) {
408 self.net
409 .rtt
410 .observe_round_trip(target, chain, elapsed_ns, now_ns);
411 }
412
413 /// Mint a fresh `CommandId` via the engine's monotonic counter.
414 /// Used by async-suspending syscalls (`After`, `Sleep`,
415 /// `BootstrapDispatch`).
416 pub fn allocate_command_id(&mut self) -> CommandId {
417 let id = *self.current.next_command_id;
418 *self.current.next_command_id = self.current.next_command_id.saturating_add(1);
419 CommandId::from(id)
420 }
421
422 /// Record a CommandId completion for the engine to drain after
423 /// `dispatch_atomic` returns. Used by `ProtocolRuntime` impls +
424 /// any role impl that returned `DispatchResult::Async`. Invoking
425 /// this in the same call lets the consumer fire in the same poll
426 /// cycle via the engine's catch-up drain.
427 pub fn complete_command(
428 &mut self,
429 cmd_id: CommandId,
430 results: Vec<(String, Box<dyn SlotValue>)>,
431 ) {
432 self.current
433 .pending_completions
434 .push(PendingCompletion { cmd_id, results });
435 }
436
437 /// Convenience for publishing events to the in-Node bus.
438 pub fn publish_bus(&mut self, event: NodeEvent) {
439 self.bus.publish(event);
440 }
441
442 /// Open a completion handle for an async Contract method. The
443 /// caller receives a fresh [`CommandId`] + a shared
444 /// [`CompletionSink`] backed by the Node's ingress queue. The
445 /// user's Contract method holds the handle past the dispatch
446 /// return and calls [`CompletionHandle::complete`] when work
447 /// finishes. The dispatch arm returns
448 /// `DispatchResult::Async(handle.cmd_id())` so the engine parks
449 /// the op until the completion lands.
450 pub fn open_completion<R, E>(&mut self) -> CompletionHandle<R, E>
451 where
452 R: serde::Serialize,
453 E: std::fmt::Display,
454 {
455 let cmd_id = self.allocate_command_id();
456 let sink: Arc<dyn CompletionSink> = self.ingress.clone();
457 CompletionHandle::new(cmd_id, sink)
458 }
459}
460
461/// Captured async-completion payload. The engine drains these from
462/// the post-dispatch `RuntimeResourceRef` and routes them through
463/// `Engine::handle_completion`.
464pub struct PendingCompletion {
465 /// The `CommandId` being fulfilled.
466 pub cmd_id: CommandId,
467 /// `(name, value)` pairs to write to the suspended Op's output
468 /// sites.
469 pub results: Vec<(String, Box<dyn SlotValue>)>,
470}
471
472/// Component-scheduled timer kind. Used by `ProtocolRuntime::on_timer`:
473/// protocol impls schedule timers via `ctx.time.scheduler` and receive
474/// the matured timer back via this newtype.
475#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
476pub struct ComponentTimerKind(pub u32);
477
478impl ComponentTimerKind {
479 /// Construct from an explicit kind id.
480 pub const fn new(kind: u32) -> Self {
481 Self(kind)
482 }
483
484 /// Inner value accessor.
485 pub const fn as_u32(self) -> u32 {
486 self.0
487 }
488}
489