bb_ops/syscalls/gates/
backoff_tx.rs1use bb_ir::proto::onnx::NodeProto;
12use bb_ir::wire_shape;
13use bb_runtime::atomic::DispatchResult;
14use bb_runtime::bus::OpError;
15use bb_runtime::ids::PeerId;
16use bb_runtime::runtime::RuntimeResourceRef;
17use bb_runtime::slot_value::SlotValue;
18use bb_runtime::syscall::values::TriggerValue;
19
20pub struct BackoffGateTxOp;
22
23pub const DOMAIN: &str = "ai.bytesandbrains.syscall";
25pub const OP_TYPE: &str = "BackoffGateTx";
27pub const ATTR_PEER: &str = wire_shape::ATTR_PEER;
29
30pub fn invoke(
35 node: &NodeProto,
36 _inputs: &[(&str, &dyn SlotValue)],
37 ctx: &mut RuntimeResourceRef<'_>,
38) -> Result<DispatchResult, OpError> {
39 let peer = read_peer_attr(node).ok_or_else(|| OpError {
40 detail: format!("BackoffGateTx missing required `{ATTR_PEER}` attribute"),
41 ..Default::default()
42 })?;
43 let now_ns = ctx.time.scheduler.now_ns();
44
45 if ctx.peers.backoff.should_retry(peer, now_ns) {
46 Ok(DispatchResult::Immediate(vec![(
47 "trigger".to_string(),
48 Box::new(TriggerValue),
49 )]))
50 } else {
51 Err(OpError {
52 detail: format!("BackoffGateTx held send to peer {peer:?}: reason=cooldown"),
53 ..Default::default()
54 })
55 }
56}
57
58fn read_peer_attr(node: &NodeProto) -> Option<PeerId> {
61 wire_shape::read_peer_bytes(node).and_then(|bytes| PeerId::from_bytes(bytes).ok())
62}
63
64
65use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
66
67inventory::submit! {
68 _BbOpsSyscallReg {
69 domain: DOMAIN,
70 op_type: OP_TYPE,
71 invoke,
72 kind: bb_runtime::registry::RegistrationKind::Syscall,
73 }
74}