Skip to main content

bb_compiler/
insert_backoff_gate_tx.rs

1//! Compiler pass - pair every `wire::Send` op with an upstream
2//! `BackoffGateTx` syscall. The gate consults
3//! [`crate::framework::BackoffTable`] and drops sends to peers
4//! currently in cooldown.
5//!
6//! Runs after `insert_peer_health_gate_tx` so the chain is
7//! `PeerHealthGateTx → BackoffGateTx → wire::Send`: peer-health
8//! denies fire first (blocklist / allowlist), backoff cooldown
9//! second (transient retry suppression).
10//!
11
12use crate::error::CompileError;
13use bb_ir::proto::onnx::{GraphProto, NodeProto, StringStringEntryProto};
14use bb_ir::syscall_ids::{OP_BACKOFF_GATE_TX as GATE_OP_TYPE, SYSCALL_DOMAIN as GATE_DOMAIN};
15use bb_ir::wire_shape;
16
17/// Idempotence stamp on the gated Send.
18pub const GATED_KEY: &str = "ai.bytesandbrains.backoff_tx_gated";
19
20const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
21const WIRE_SEND_OP: &str = "Send";
22
23/// Insert a `BackoffGateTx` upstream of every `wire::Send` that
24/// carries a destination `peer` attribute.
25pub fn insert_backoff_gate_tx(sub_graph: &mut GraphProto) -> Result<(), CompileError> {
26    let mut gates: Vec<NodeProto> = Vec::new();
27
28    for node in sub_graph.node.iter_mut() {
29        if node.domain != WIRE_DOMAIN || node.op_type != WIRE_SEND_OP {
30            continue;
31        }
32        if metadata_value(node, GATED_KEY).is_some() {
33            continue;
34        }
35        let Some(peer) = read_peer(node) else {
36            continue;
37        };
38        let Some(gated_input) = node.input.first().cloned() else {
39            continue;
40        };
41
42        let gate_output = format!("{}#backoff_tx_gated", node.name);
43        gates.push(build_gate_node(
44            &node.name,
45            &gated_input,
46            &gate_output,
47            &peer,
48        ));
49
50        node.input[0] = gate_output;
51        set_metadata(&mut node.metadata_props, GATED_KEY, "true");
52    }
53
54    sub_graph.node.extend(gates);
55    Ok(())
56}
57
58fn build_gate_node(
59    source_name: &str,
60    trigger_input: &str,
61    gate_output: &str,
62    peer_bytes: &[u8],
63) -> NodeProto {
64    // Stamp the destination peer as `attribute.s` (multihash bytes);
65    // an int encoding would lose multihashes that exceed 8 bytes.
66    let mut node = NodeProto {
67        op_type: GATE_OP_TYPE.to_string(),
68        domain: GATE_DOMAIN.to_string(),
69        name: format!("BackoffGateTx@{source_name}"),
70        input: vec![trigger_input.to_string()],
71        output: vec![gate_output.to_string()],
72        metadata_props: vec![
73            StringStringEntryProto {
74                key: "ai.bytesandbrains.backoff_tx_source".to_string(),
75                value: source_name.to_string(),
76            },
77            StringStringEntryProto {
78                key: GATED_KEY.to_string(),
79                value: "true".to_string(),
80            },
81        ],
82        ..Default::default()
83    };
84    wire_shape::stamp_peer_bytes(&mut node, peer_bytes.to_vec());
85    node
86}
87
88/// Read the destination peer bytes from the Send.
89fn read_peer(node: &NodeProto) -> Option<Vec<u8>> {
90    wire_shape::read_peer_bytes(node).map(|bytes| bytes.to_vec())
91}
92
93fn metadata_value(node: &NodeProto, key: &str) -> Option<String> {
94    node.metadata_props
95        .iter()
96        .find(|p| p.key == key)
97        .map(|p| p.value.clone())
98}
99
100fn set_metadata(props: &mut Vec<StringStringEntryProto>, key: &str, value: &str) {
101    if let Some(existing) = props.iter_mut().find(|p| p.key == key) {
102        existing.value = value.to_string();
103        return;
104    }
105    props.push(StringStringEntryProto {
106        key: key.to_string(),
107        value: value.to_string(),
108    });
109}
110