bb_ops/syscalls/gates/
backoff_rx.rs1use bb_ir::proto::onnx::NodeProto;
14use bb_runtime::atomic::DispatchResult;
15use bb_runtime::bus::OpError;
16use bb_runtime::runtime::RuntimeResourceRef;
17use bb_runtime::slot_value::SlotValue;
18
19pub struct BackoffGateRxOp;
21
22pub const DOMAIN: &str = "ai.bytesandbrains.syscall";
24pub const OP_TYPE: &str = "BackoffGateRx";
26
27pub fn invoke(
30 _node: &NodeProto,
31 inputs: &[(&str, &dyn SlotValue)],
32 ctx: &mut RuntimeResourceRef<'_>,
33) -> Result<DispatchResult, OpError> {
34 let (_, value) = inputs.first().ok_or_else(|| OpError {
35 detail: "BackoffGateRx requires one input".into(),
36 ..Default::default()
37 })?;
38
39 let Some(src_peer) = ctx.current.inbound.src_peer else {
40 return Err(OpError {
41 detail: "BackoffGateRx: no envelope source peer in runtime context".into(),
42 ..Default::default()
43 });
44 };
45
46 let now_ns = ctx.time.scheduler.now_ns();
47 if ctx.peers.backoff.should_retry(src_peer, now_ns) {
48 Ok(DispatchResult::Immediate(vec![(
49 "value".to_string(),
50 value.clone_boxed(),
51 )]))
52 } else {
53 Err(OpError {
54 detail: format!(
55 "BackoffGateRx dropped envelope from peer {src_peer:?}: reason=cooldown"
56 ),
57 ..Default::default()
58 })
59 }
60}
61
62
63use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
64
65inventory::submit! {
66 _BbOpsSyscallReg {
67 domain: DOMAIN,
68 op_type: OP_TYPE,
69 invoke,
70 kind: bb_runtime::registry::RegistrationKind::Syscall,
71 }
72}