Skip to main content

bb_compiler/
insert_backoff_gate_rx.rs

1//! Compiler pass - pair every synthesized `wire::Recv` op with a
2//! downstream `BackoffGateRx`. Runs after `insert_peer_health_gate_rx`
3//! so the RX chain order is `Recv → DedupGateRx →
4//! PeerHealthGateRx → BackoffGateRx`.
5//!
6//! The gate reads the inbound envelope's source peer from
7//! `RuntimeResourceRef::envelope_src_peer` and consults
8//! [`crate::framework::BackoffTable::should_retry`]. Peers in
9//! cooldown fail with a `cooldown` label.
10//!
11
12use crate::error::CompileError;
13use crate::rx_chain::{rx_chain_head, set_rx_chain_head};
14use bb_ir::proto::onnx::{GraphProto, NodeProto, StringStringEntryProto};
15use bb_ir::syscall_ids::{OP_BACKOFF_GATE_RX as GATE_OP_TYPE, SYSCALL_DOMAIN as GATE_DOMAIN};
16
17/// Idempotence stamp on the gated Recv.
18pub const GATED_KEY: &str = "ai.bytesandbrains.backoff_rx_gated";
19
20const WIRE_DOMAIN: &str = "ai.bytesandbrains.wire";
21const WIRE_RECV_OP: &str = "Recv";
22
23/// Insert a `BackoffGateRx` after every synthesized `wire::Recv`.
24pub fn insert_backoff_gate_rx(sub_graph: &mut GraphProto) -> Result<(), CompileError> {
25    let recv_indices: Vec<usize> = sub_graph
26        .node
27        .iter()
28        .enumerate()
29        .filter_map(|(i, n)| (n.domain == WIRE_DOMAIN && n.op_type == WIRE_RECV_OP).then_some(i))
30        .collect();
31
32    let mut new_gates: Vec<NodeProto> = Vec::new();
33
34    for recv_idx in recv_indices {
35        if metadata_value(&sub_graph.node[recv_idx], GATED_KEY).is_some() {
36            continue;
37        }
38        let recv_name = sub_graph.node[recv_idx].name.clone();
39        let head = rx_chain_head(&sub_graph.node[recv_idx]);
40        let new_head = format!("{recv_name}#backoff_rx_out");
41
42        new_gates.push(build_gate_node(&recv_name, &head, &new_head));
43
44        rewire_consumers(sub_graph, recv_idx, &head, &new_head);
45
46        set_metadata(
47            &mut sub_graph.node[recv_idx].metadata_props,
48            GATED_KEY,
49            "true",
50        );
51        set_rx_chain_head(&mut sub_graph.node[recv_idx], &new_head);
52    }
53
54    sub_graph.node.extend(new_gates);
55    Ok(())
56}
57
58fn build_gate_node(source_name: &str, input: &str, output: &str) -> NodeProto {
59    NodeProto {
60        op_type: GATE_OP_TYPE.to_string(),
61        domain: GATE_DOMAIN.to_string(),
62        name: format!("BackoffGateRx@{source_name}"),
63        input: vec![input.to_string()],
64        output: vec![output.to_string()],
65        metadata_props: vec![StringStringEntryProto {
66            key: "ai.bytesandbrains.backoff_rx_source".to_string(),
67            value: source_name.to_string(),
68        }],
69        ..Default::default()
70    }
71}
72
73fn rewire_consumers(sub_graph: &mut GraphProto, recv_idx: usize, old_name: &str, new_name: &str) {
74    for (idx, node) in sub_graph.node.iter_mut().enumerate() {
75        if idx == recv_idx {
76            continue;
77        }
78        for inp in node.input.iter_mut() {
79            if inp == old_name {
80                *inp = new_name.to_string();
81            }
82        }
83    }
84}
85
86fn metadata_value(node: &NodeProto, key: &str) -> Option<String> {
87    node.metadata_props
88        .iter()
89        .find(|p| p.key == key)
90        .map(|p| p.value.clone())
91}
92
93fn set_metadata(props: &mut Vec<StringStringEntryProto>, key: &str, value: &str) {
94    if let Some(existing) = props.iter_mut().find(|p| p.key == key) {
95        existing.value = value.to_string();
96        return;
97    }
98    props.push(StringStringEntryProto {
99        key: key.to_string(),
100        value: value.to_string(),
101    });
102}
103