Skip to main content

bb_ops/syscalls/gates/
backoff_rx.rs

1//! `BackoffGateRx` syscall op - gate inserted by the compiler pass
2//! `bb-compiler/src/insert_backoff_gate_rx.rs` in the RX chain.
3//! Consults [`bb_runtime::framework::BackoffTable`] for the inbound
4//! envelope's source peer and forwards the input value on retry-
5//! eligible; drops with a `cooldown` reason when the peer is still
6//! in backoff.
7//!
8//! The source peer is read from `RuntimeResourceRef::envelope_src_peer`
9//! populated by the engine per inbound envelope (RX gates never
10//! consult a NodeProto attribute for peer identity; the runtime
11//! delivers the inbound src_peer directly).
12
13use bb_ir::proto::onnx::NodeProto;
14use bb_runtime::atomic::DispatchResult;
15use bb_runtime::bus::OpError;
16use bb_runtime::runtime::RuntimeResourceRef;
17use bb_runtime::slot_value::SlotValue;
18
19/// Marker struct for `register_syscall::<BackoffGateRxOp>`.
20pub struct BackoffGateRxOp;
21
22/// `(domain, op_type)` registration key.
23pub const DOMAIN: &str = "ai.bytesandbrains.syscall";
24/// Op type name.
25pub const OP_TYPE: &str = "BackoffGateRx";
26
27/// Invoke fn - consults `BackoffTable::should_retry` for the
28/// envelope's source peer and forwards the input on retry-eligible.
29pub fn invoke(
30    _node: &NodeProto,
31    inputs: &[(&str, &dyn SlotValue)],
32    ctx: &mut RuntimeResourceRef<'_>,
33) -> Result<DispatchResult, OpError> {
34    let (_, value) = inputs.first().ok_or_else(|| OpError {
35        detail: "BackoffGateRx requires one input".into(),
36        ..Default::default()
37    })?;
38
39    let Some(src_peer) = ctx.current.inbound.src_peer else {
40        return Err(OpError {
41            detail: "BackoffGateRx: no envelope source peer in runtime context".into(),
42            ..Default::default()
43        });
44    };
45
46    let now_ns = ctx.time.scheduler.now_ns();
47    if ctx.peers.backoff.should_retry(src_peer, now_ns) {
48        Ok(DispatchResult::Immediate(vec![(
49            "value".to_string(),
50            value.clone_boxed(),
51        )]))
52    } else {
53        Err(OpError {
54            detail: format!(
55                "BackoffGateRx dropped envelope from peer {src_peer:?}: reason=cooldown"
56            ),
57            ..Default::default()
58        })
59    }
60}
61
62
63use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
64
65inventory::submit! {
66    _BbOpsSyscallReg {
67        domain: DOMAIN,
68        op_type: OP_TYPE,
69        invoke,
70        kind: bb_runtime::registry::RegistrationKind::Syscall,
71    }
72}