Skip to main content

bb_ops/syscalls/gates/
backoff_tx.rs

1//! `BackoffGateTx` syscall op - gate inserted by the compiler pass
2//! `bb-compiler/src/insert_backoff_gate_tx.rs` between
3//! `PeerHealthGateTx` and `wire::Send`. Consults
4//! [`bb_runtime::framework::BackoffTable`] and drops the send if the
5//! destination peer is still in cooldown.
6//!
7//! The destination peer rides as `ATTR_PEER` on the gate NodeProto
8//! as multihash bytes on `attribute.s` (per
9//! [`bb_ir::wire_shape::ATTR_PEER`] / [`bb_ir::wire_shape::read_peer_bytes`]).
10
11use bb_ir::proto::onnx::NodeProto;
12use bb_ir::wire_shape;
13use bb_runtime::atomic::DispatchResult;
14use bb_runtime::bus::OpError;
15use bb_runtime::ids::PeerId;
16use bb_runtime::runtime::RuntimeResourceRef;
17use bb_runtime::slot_value::SlotValue;
18use bb_runtime::syscall::values::TriggerValue;
19
20/// Marker struct for `register_syscall::<BackoffGateTxOp>`.
21pub struct BackoffGateTxOp;
22
23/// `(domain, op_type)` registration key.
24pub const DOMAIN: &str = "ai.bytesandbrains.syscall";
25/// Op type name.
26pub const OP_TYPE: &str = "BackoffGateTx";
27/// Re-export the canonical ATTR_PEER constant from `bb_ir::wire_shape`.
28pub const ATTR_PEER: &str = wire_shape::ATTR_PEER;
29
30/// Invoke fn - consults `BackoffTable::should_retry` for the
31/// destination peer. Emits a `TriggerValue` when the peer is retry-
32/// able; fails with a `cooldown` reason when still in the backoff
33/// window.
34pub fn invoke(
35    node: &NodeProto,
36    _inputs: &[(&str, &dyn SlotValue)],
37    ctx: &mut RuntimeResourceRef<'_>,
38) -> Result<DispatchResult, OpError> {
39    let peer = read_peer_attr(node).ok_or_else(|| OpError {
40        detail: format!("BackoffGateTx missing required `{ATTR_PEER}` attribute"),
41        ..Default::default()
42    })?;
43    let now_ns = ctx.time.scheduler.now_ns();
44
45    if ctx.peers.backoff.should_retry(peer, now_ns) {
46        Ok(DispatchResult::Immediate(vec![(
47            "trigger".to_string(),
48            Box::new(TriggerValue),
49        )]))
50    } else {
51        Err(OpError {
52            detail: format!("BackoffGateTx held send to peer {peer:?}: reason=cooldown"),
53            ..Default::default()
54        })
55    }
56}
57
58/// Read the gate's destination peer from `ATTR_PEER` (multihash
59/// bytes on `attribute.s`).
60fn read_peer_attr(node: &NodeProto) -> Option<PeerId> {
61    wire_shape::read_peer_bytes(node).and_then(|bytes| PeerId::from_bytes(bytes).ok())
62}
63
64
65use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
66
67inventory::submit! {
68    _BbOpsSyscallReg {
69        domain: DOMAIN,
70        op_type: OP_TYPE,
71        invoke,
72        kind: bb_runtime::registry::RegistrationKind::Syscall,
73    }
74}