Skip to main content

bb_runtime/
runtime.rs

1//! Runtime resource handle threaded into every `dispatch_atomic`
2//! call.
3//!
4//! The engine constructs a `RuntimeResourceRef` by split-borrowing
5//! the framework-primitive bundle + the bus before each
6//! `dispatch_atomic` call. Each field is a distinct `&mut`, so the
7//! borrow checker enforces field-level exclusivity — an Op may
8//! touch any subset.
9//!
10//! Async-completing impls call `ctx.complete_command(cmd_id,
11//! results)`; the engine drains `pending_completions` after the
12//! hook returns and routes them through `handle_completion`.
13
14use std::sync::Arc;
15
16use crate::bus::{AllocFailReason, AppIngressErrorKind, AppIngressSource, NodeEvent, TypedBus};
17use crate::completion::{CompletionHandle, CompletionSink};
18use crate::framework::rtt_tracker::{chain_id_from_targets, ChainContext};
19use crate::framework::{
20    rtt_tracker::RttTracker, AddressBook, BackoffTable, BackpressureTracker, EventSource,
21    HoldTable, InboundDedup, OutboundQueue, PeerGate, PeerGovernor, RecordBuffer, RequestTracker,
22    RngU64Source, Scheduler, SerializeQueue,
23};
24use crate::ids::{CommandId, OpRef};
25use crate::ingress::{IngressEvent, IngressQueue, COMPLETION_DETAIL_CAP};
26use crate::slot_value::SlotValue;
27
28impl CompletionSink for IngressQueue {
29    fn complete(&self, cmd_id: CommandId, result_bytes: &[u8]) {
30        // Per Principle 1a: `result_bytes` is borrowed from the
31        // caller's stack/transport buffer. Cap-check, fallibly
32        // reserve framework-owned storage, then copy. The owned
33        // `Vec<u8>` rides into the engine via the `Completion`
34        // variant; cap / alloc failures publish an `AppIngressError`
35        // sibling and drop the result (the parked op times out
36        // naturally — same surface as a missing completion).
37        let byte_count = result_bytes.len();
38        let cap = self.completion_result_cap();
39        if byte_count > cap {
40            let _ = self.push(IngressEvent::AppIngressError {
41                source: AppIngressSource::Completion { command: cmd_id },
42                byte_count,
43                kind: AppIngressErrorKind::PerItemCapExceeded { cap },
44            });
45            return;
46        }
47        let mut owned: Vec<u8> = Vec::new();
48        if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
49            let _ = self.push(IngressEvent::AppIngressError {
50                source: AppIngressSource::Completion { command: cmd_id },
51                byte_count,
52                kind: AppIngressErrorKind::AllocationFailed {
53                    reason: AllocFailReason::HeapExhausted,
54                },
55            });
56            return;
57        }
58        owned.extend_from_slice(result_bytes);
59        let _ = self.push(IngressEvent::Completion {
60            cmd_id,
61            results: vec![owned],
62        });
63    }
64
65    fn fail(&self, cmd_id: CommandId, detail: &str) {
66        // Push the typed `CompletionFailed` variant so the parked
67        // op fails through `handle_completion_failed` → typed
68        // `OpFailed`, not via success-bytes masquerading as a
69        // completion.
70        //
71        // The detail string is truncated rather than rejected at
72        // `COMPLETION_DETAIL_CAP`: a `Display`-rendered failure must
73        // always land so the component sees a real failure instead
74        // of a missing completion masquerading as a timeout.
75        let truncated = if detail.len() > COMPLETION_DETAIL_CAP {
76            let mut end = COMPLETION_DETAIL_CAP;
77            while end > 0 && !detail.is_char_boundary(end) {
78                end -= 1;
79            }
80            &detail[..end]
81        } else {
82            detail
83        };
84        let owned: String = truncated.to_string();
85        let _ = self.push(IngressEvent::CompletionFailed {
86            cmd_id,
87            detail: owned,
88        });
89    }
90}
91
92/// Per-peer state borrowed mutably during dispatch.
93pub struct PeerCtx<'a> {
94    /// Per-peer concurrency limiter.
95    pub gate: &'a mut PeerGate,
96    /// Per-peer exponential backoff state.
97    pub backoff: &'a mut BackoffTable,
98    /// Peer policy + health source-of-truth.
99    pub governor: &'a mut PeerGovernor,
100    /// `PeerId → (Vec<Address>, ref_count)` registry.
101    pub addresses: &'a mut AddressBook,
102    /// Receiver-side back-pressure tracker. RX gates + ingress
103    /// detection sites consult this to record overload and decide
104    /// between emitting a typed `BackoffNotice` envelope or silently
105    /// dropping.
106    pub backpressure: &'a mut BackpressureTracker,
107}
108
109/// Network/transport state borrowed mutably during dispatch.
110pub struct NetCtx<'a> {
111    /// FIFO of wire envelopes ready to ship on the next outbound drain.
112    pub outbound: &'a mut OutboundQueue,
113    /// Per-NodeSiteId adaptive RTT tracker.
114    pub rtt: &'a mut RttTracker,
115    /// In-flight wire-request → CommandId map.
116    pub requests: &'a mut RequestTracker,
117    /// Sliding-window seen-message tracker.
118    pub dedup: &'a mut InboundDedup,
119    /// Peer-resolution failures captured during this poll cycle.
120    pub pending_peer_resolve_failures: &'a mut Vec<(Option<crate::ids::PeerId>, crate::ids::OpRef)>,
121}
122
123/// Time / scheduling state borrowed mutably during dispatch.
124pub struct TimeCtx<'a> {
125    /// Sorted timer heap.
126    pub scheduler: &'a mut Scheduler,
127}
128
129/// Syscall-side state (storage, RNG, latches, app-event drain).
130pub struct SyscallCtx<'a> {
131    /// Named-FIFO map for `Serialize.Enqueue` / `Dequeue`.
132    pub serialize_queue: &'a mut SerializeQueue,
133    /// Named-slot value buffer for `Hold.Stash` / `Flush`.
134    pub hold_table: &'a mut HoldTable,
135    /// Per-name bounded ring buffer for `Record`.
136    pub record_buffer: &'a mut RecordBuffer,
137    /// Registered `EventKind → ComponentTag` subscriptions.
138    pub event_source: &'a mut EventSource,
139    /// Per-Node counters bumped by `IncrMetric`.
140    pub counters: &'a mut std::collections::HashMap<String, u64>,
141    /// Per-group first-arrival latch for the `Any` syscall.
142    pub any_fired_groups: &'a mut std::collections::HashSet<String>,
143    /// Per-`(OpRef, ExecId)` latch for the `DeadlineMatch` syscall.
144    pub deadline_match_fired: &'a mut std::collections::HashSet<(u64, u64)>,
145    /// `u64` RNG source for the `RngU64` syscall.
146    pub rng: &'a mut dyn RngU64Source,
147    /// App events pending emission on the next poll's outbound drain.
148    pub pending_app_events: &'a mut Vec<crate::bus::AppEvent>,
149}
150
151/// Inbound-envelope context captured at delivery time and threaded
152/// into every op dispatched as part of the cascade.
153#[derive(Clone, Copy, Debug, Default)]
154pub struct InboundCtx {
155    /// The `PeerId` of the inbound envelope's transport-reported
156    /// source. `None` outside the inbound delivery path.
157    pub src_peer: Option<crate::ids::PeerId>,
158    /// The inbound envelope's `wire_req_id` correlation token.
159    pub wire_req_id: Option<u64>,
160    /// Engine-clock timestamp the envelope arrived at.
161    pub arrival_ns: Option<u64>,
162    /// Remaining deadline budget propagated by the sender.
163    pub remaining_deadline_ns: Option<u64>,
164}
165
166/// State scoped to the currently-dispatching op (NodeProto-level
167/// metadata, the op's identity, completion drain, command-id mint).
168pub struct CurrentCallCtx<'a> {
169    /// The `OpRef` of the Op currently being dispatched.
170    pub op_ref: OpRef,
171    /// The `ExecId` this dispatch belongs to. Syscalls that latch
172    /// per-execution (`DeadlineMatch`, `Any`) key on
173    /// `(op_ref, exec_id)` so a fresh execution starts unlatched.
174    pub exec_id: crate::ids::ExecId,
175    /// The Node's own `PeerId`.
176    pub self_peer: crate::ids::PeerId,
177    /// Attributes of the NodeProto being dispatched.
178    pub node_attributes: &'a [bb_ir::proto::onnx::AttributeProto],
179    /// Metadata_props of the NodeProto being dispatched.
180    pub node_metadata: &'a [bb_ir::proto::onnx::StringStringEntryProto],
181    /// Inbound-envelope context (all four `inbound_*` fields).
182    pub inbound: InboundCtx,
183    /// Completions captured during this dispatch.
184    pub pending_completions: Vec<PendingCompletion>,
185    /// Engine's monotonic CommandId source.
186    pub next_command_id: &'a mut u64,
187}
188
189/// Engine-resource handle threaded into every `dispatch_atomic`
190/// call. Per `docs/ENGINE.md` §10. Fields are grouped by concern -
191/// `peers`/`net`/`time`/`syscall` carry the framework primitive
192/// references; `current` carries per-op state; `bus`/`ingress`
193/// stay top-level; `components` exposes the cross-component
194/// read-only surface.
195pub struct RuntimeResourceRef<'a> {
196    /// Per-peer state (gate, backoff, governor, addresses).
197    pub peers: PeerCtx<'a>,
198    /// Network/transport state (outbound queue, RTT, requests, dedup).
199    pub net: NetCtx<'a>,
200    /// Time / scheduling state.
201    pub time: TimeCtx<'a>,
202    /// Syscall-side state (storage, RNG, latches, app-event drain).
203    pub syscall: SyscallCtx<'a>,
204    /// The in-Node typed event bus.
205    pub bus: &'a mut TypedBus,
206    /// Shared handle to the Node's ingress queue.
207    pub ingress: Arc<IngressQueue>,
208    /// Read-only view onto sibling components registered on the Node.
209    pub components: ComponentsView<'a>,
210    /// State scoped to the currently-dispatching op.
211    pub current: CurrentCallCtx<'a>,
212}
213
214/// Read-only view onto sibling components registered on the Node.
215/// Constructed by the engine at dispatch time from
216/// `&engine.components` + `&engine.slots`. The Vec is indexed by
217/// `ComponentRef.as_u32() as usize`; the slot the currently-
218/// dispatching component lives in is `None` for the duration of
219/// the dispatch (take-and-restore in `invoke_atomic`), so callers
220/// can't accidentally re-enter themselves.
221#[derive(Default)]
222pub struct ComponentsView<'a> {
223    /// All registered components indexed by `ComponentRef.as_u32()`,
224    /// borrowed from `engine.components`. `None` outside engine
225    /// context (test setups bypassing the registry).
226    pub instances: Option<&'a [Option<Box<dyn crate::component::ErasedComponent>>]>,
227    /// Author-chosen-slot-name → `ComponentRef` map, borrowed from
228    /// `engine.slots`. THE canonical dependency-resolution surface.
229    /// `None` outside engine context.
230    pub slots: Option<&'a std::collections::HashMap<String, crate::ids::ComponentRef>>,
231}
232
233impl ComponentsView<'_> {
234    /// Look up the component bound at `slot_name`. The generic
235    /// dependency-resolution surface — Components reach their
236    /// declared dependencies through this accessor. Returns `None`
237    /// when no slot of that name is bound or when the view has no
238    /// engine context.
239    pub fn for_slot(&self, slot_name: &str) -> Option<&dyn crate::component::ErasedComponent> {
240        let slots = self.slots?;
241        let instances = self.instances?;
242        let cref = slots.get(slot_name)?;
243        let idx = cref.as_u32() as usize;
244        instances.get(idx)?.as_deref()
245    }
246
247    /// Look up the component bound at `slot_name` AND downcast it
248    /// to `&T`. The typed counterpart to [`Self::for_slot`].
249    /// Returns `None` when the slot is unbound OR when the bound
250    /// concrete is not a `T`.
251    ///
252    /// In production this is reached through
253    /// [`crate::runtime::RuntimeResourceRef::dependency`], which
254    /// wraps the lookup in [`DependencyError`] variants for typed
255    /// error reporting.
256    pub fn for_slot_as<T: 'static>(&self, slot_name: &str) -> Option<&T> {
257        let erased = self.for_slot(slot_name)?;
258        let any: &dyn std::any::Any = erased;
259        any.downcast_ref::<T>()
260    }
261}
262
263/// Errors surfaced by [`RuntimeResourceRef::dependency`].
264///
265/// In production the dependency is verified at compile time by
266/// `resolve_component_dependencies`, so a runtime miss represents
267/// a framework invariant breach (e.g. someone bypassed
268/// `Node::install` to register a custom slot mapping). The error
269/// is exposed instead of panicking so test fixtures + introspection
270/// tooling can probe the surface without aborting the process.
271#[derive(Debug)]
272pub enum DependencyError {
273    /// No component is bound at the requested slot.
274    NotBound {
275        /// The slot name the caller requested.
276        slot: String,
277    },
278    /// A component IS bound but downcasting to the requested
279    /// type failed - the slot holds a different concrete than
280    /// the caller expected.
281    TypeMismatch {
282        /// The slot name the caller requested.
283        slot: String,
284        /// `std::any::type_name` of the expected type.
285        expected: &'static str,
286    },
287}
288
289impl std::fmt::Display for DependencyError {
290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291        match self {
292            Self::NotBound { slot } => write!(f, "no component bound at slot `{slot}`"),
293            Self::TypeMismatch { slot, expected } => {
294                write!(f, "component at slot `{slot}` is not a `{expected}`",)
295            }
296        }
297    }
298}
299
300impl std::error::Error for DependencyError {}
301
302impl RuntimeResourceRef<'_> {
303    /// Ordered local-address bag for this Node. Reads the
304    /// AddressBook entry keyed by `self.current.self_peer`; returns
305    /// an empty slice when no local addresses are registered.
306    /// Wire ops + identity-bearing protocol replies (Announce,
307    /// Handshake) stamp this onto their outbound envelopes so
308    /// receivers can dial back on every reachable interface.
309    pub fn local_addresses(&self) -> &[crate::framework::Address] {
310        self.peers
311            .addresses
312            .lookup(self.current.self_peer)
313            .unwrap_or(&[])
314    }
315
316    /// Typed accessor for an author-declared dependency. Resolves
317    /// `slot_name` against the engine's generic slot registry and
318    /// downcasts the bound `ErasedComponent` to `&T`.
319    ///
320    /// In production the resolution is guaranteed to succeed -
321    /// `resolve_component_dependencies` verifies at compile time
322    /// that every `#[depends(<role> = "<slot>")]` declaration
323    /// matches a bound concrete of the right role. A miss here is
324    /// either a test fixture bypassing the compiler pipeline or a
325    /// framework invariant breach.
326    ///
327    /// ```ignore
328    /// // Inside a Component's dispatch_atomic / Contract impl:
329    /// let backend = ctx
330    ///     .dependency::<MyCpuBackend>("compute")
331    ///     .expect("compiler verified");
332    /// let result = backend.matmul(&lhs, &rhs)?;
333    /// ```
334    pub fn dependency<T: 'static>(&self, slot_name: &str) -> Result<&T, DependencyError> {
335        if self.components.for_slot(slot_name).is_none() {
336            return Err(DependencyError::NotBound {
337                slot: slot_name.to_string(),
338            });
339        }
340        self.components
341            .for_slot_as::<T>(slot_name)
342            .ok_or_else(|| DependencyError::TypeMismatch {
343                slot: slot_name.to_string(),
344                expected: std::any::type_name::<T>(),
345            })
346    }
347
348    /// -ii - convenience helper
349    /// that walks the hierarchical fallback in
350    /// [`crate::framework::rtt_tracker::RttTracker`] to pick the
351    /// effective deadline for a wire round-trip.
352    ///
353    /// `chain_id` + `hop_index` come from the compiler-stamped
354    /// `chain_targets` / `chain_depth` metadata on the current
355    /// NodeProto (read from
356    /// [`Self::current_node_metadata`] via
357    /// [`Self::read_chain_context`]); pass `None` for control-plane
358    /// sends that have no chain context.
359    pub fn estimate_wire_budget_ns(
360        &self,
361        target: crate::ids::NodeSiteId,
362        chain: Option<crate::framework::rtt_tracker::ChainContext>,
363        static_default_ns: u64,
364    ) -> u64 {
365        self.net
366            .rtt
367            .estimate_budget_ns(target, chain, static_default_ns)
368    }
369
370    /// Read the compiler-stamped `chain_targets` + chain hop
371    /// (encoded in `chain_depth` metadata) off the current NodeProto
372    /// and convert them into an [`crate::framework::rtt_tracker::ChainContext`].
373    /// Returns `None` when no chain metadata is present (the Send is
374    /// a fire-and-forget escape hatch or a control-plane round-trip).
375    pub fn read_chain_context(&self) -> Option<crate::framework::rtt_tracker::ChainContext> {
376        let mut chain_targets: Option<&str> = None;
377        let mut hop_index: u8 = 0;
378        for prop in self.current.node_metadata {
379            match prop.key.as_str() {
380                "ai.bytesandbrains.wire.chain_targets" => {
381                    chain_targets = Some(prop.value.as_str());
382                }
383                "ai.bytesandbrains.wire.chain_hop_index" => {
384                    if let Ok(h) = prop.value.parse::<u8>() {
385                        hop_index = h;
386                    }
387                }
388                _ => {}
389            }
390        }
391        chain_targets.map(|targets| ChainContext {
392            chain_id: chain_id_from_targets(targets),
393            hop_index,
394        })
395    }
396
397    /// Record a wire round-trip sample into the RTT tracker. Called
398    /// on response landing (after the matching `WireResponseLanded`
399    /// event surfaces the elapsed time) so all the hierarchical-
400    /// fallback EMA tiers stay current.
401    pub fn observe_wire_round_trip(
402        &mut self,
403        target: crate::ids::NodeSiteId,
404        chain: Option<crate::framework::rtt_tracker::ChainContext>,
405        elapsed_ns: u64,
406        now_ns: u64,
407    ) {
408        self.net
409            .rtt
410            .observe_round_trip(target, chain, elapsed_ns, now_ns);
411    }
412
413    /// Mint a fresh `CommandId` via the engine's monotonic counter.
414    /// Used by async-suspending syscalls (`After`, `Sleep`,
415    /// `BootstrapDispatch`).
416    pub fn allocate_command_id(&mut self) -> CommandId {
417        let id = *self.current.next_command_id;
418        *self.current.next_command_id = self.current.next_command_id.saturating_add(1);
419        CommandId::from(id)
420    }
421
422    /// Record a CommandId completion for the engine to drain after
423    /// `dispatch_atomic` returns. Used by `ProtocolRuntime` impls +
424    /// any role impl that returned `DispatchResult::Async`. Invoking
425    /// this in the same call lets the consumer fire in the same poll
426    /// cycle via the engine's catch-up drain.
427    pub fn complete_command(
428        &mut self,
429        cmd_id: CommandId,
430        results: Vec<(String, Box<dyn SlotValue>)>,
431    ) {
432        self.current
433            .pending_completions
434            .push(PendingCompletion { cmd_id, results });
435    }
436
437    /// Convenience for publishing events to the in-Node bus.
438    pub fn publish_bus(&mut self, event: NodeEvent) {
439        self.bus.publish(event);
440    }
441
442    /// Open a completion handle for an async Contract method. The
443    /// caller receives a fresh [`CommandId`] + a shared
444    /// [`CompletionSink`] backed by the Node's ingress queue. The
445    /// user's Contract method holds the handle past the dispatch
446    /// return and calls [`CompletionHandle::complete`] when work
447    /// finishes. The dispatch arm returns
448    /// `DispatchResult::Async(handle.cmd_id())` so the engine parks
449    /// the op until the completion lands.
450    pub fn open_completion<R, E>(&mut self) -> CompletionHandle<R, E>
451    where
452        R: serde::Serialize,
453        E: std::fmt::Display,
454    {
455        let cmd_id = self.allocate_command_id();
456        let sink: Arc<dyn CompletionSink> = self.ingress.clone();
457        CompletionHandle::new(cmd_id, sink)
458    }
459}
460
461/// Captured async-completion payload. The engine drains these from
462/// the post-dispatch `RuntimeResourceRef` and routes them through
463/// `Engine::handle_completion`.
464pub struct PendingCompletion {
465    /// The `CommandId` being fulfilled.
466    pub cmd_id: CommandId,
467    /// `(name, value)` pairs to write to the suspended Op's output
468    /// sites.
469    pub results: Vec<(String, Box<dyn SlotValue>)>,
470}
471
472/// Component-scheduled timer kind. Used by `ProtocolRuntime::on_timer`:
473/// protocol impls schedule timers via `ctx.time.scheduler` and receive
474/// the matured timer back via this newtype.
475#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
476pub struct ComponentTimerKind(pub u32);
477
478impl ComponentTimerKind {
479    /// Construct from an explicit kind id.
480    pub const fn new(kind: u32) -> Self {
481        Self(kind)
482    }
483
484    /// Inner value accessor.
485    pub const fn as_u32(self) -> u32 {
486        self.0
487    }
488}
489