Skip to main content

bb_compiler/
insert_async_deadlines.rs

1//! Pair every NodeProto carrying a `deadline_ns` attribute with an
2//! upstream `DeadlineCheck` syscall gate. The check writes to a new
3//! sibling `<node>#__trigger_deadline` slot so the protected node
4//! fires only after both its original payload AND the deadline gate.
5//!
6//! Idempotent — re-running on an already-gated graph is a no-op.
7
8use crate::error::CompileError;
9use bb_ir::proto::onnx::{AttributeProto, GraphProto, NodeProto, StringStringEntryProto};
10use bb_ir::syscall_ids::{
11    ATTR_DEADLINE_NS, OP_DEADLINE_CHECK as DEADLINE_OP_TYPE, SYSCALL_DOMAIN as DEADLINE_DOMAIN,
12};
13
14/// Idempotence stamp - once a node has been paired with a
15/// `DeadlineCheck`, this metadata entry stops the pass from
16/// re-inserting on a second run.
17pub const GATED_KEY: &str = "ai.bytesandbrains.deadline_gated";
18
19/// Suffix appended to a protected node's name to form the sibling
20/// trigger slot that the inserted `DeadlineCheck` writes.
21pub const TRIGGER_DEADLINE_SUFFIX: &str = "#__trigger_deadline";
22
23/// Insert a `DeadlineCheck` upstream of every `deadline_ns`-bearing
24/// node in `sub_graph`. Pure (the input graph is mutated in place
25/// but the function has no side effects beyond that).
26///
27/// The `DeadlineCheck` writes to a sibling input slot
28/// `<protected>#__trigger_deadline`; the engine fires the protected
29/// node only after every input (original payloads plus the new
30/// trigger slot) has been filled.
31pub fn insert_async_deadlines(sub_graph: &mut GraphProto) -> Result<(), CompileError> {
32    let mut gates: Vec<NodeProto> = Vec::new();
33
34    for node in sub_graph.node.iter_mut() {
35        if metadata_value(node, GATED_KEY).is_some() {
36            continue;
37        }
38        let Some(deadline_ns) = read_deadline(node) else {
39            continue;
40        };
41        // The gate observes the protected node's first input as a
42        // proxy for "this node is preparing to fire". When the node
43        // has no inputs there is nothing to observe, so skip.
44        let Some(trigger_proxy) = node.input.first().cloned() else {
45            continue;
46        };
47
48        let trigger_slot = format!("{}{TRIGGER_DEADLINE_SUFFIX}", node.name);
49        gates.push(build_deadline_check_node(
50            &node.name,
51            &trigger_proxy,
52            &trigger_slot,
53            deadline_ns,
54        ));
55
56        // Append the sibling deadline-trigger slot; existing inputs
57        // stay untouched.
58        node.input.push(trigger_slot);
59        set_metadata(&mut node.metadata_props, GATED_KEY, "true");
60    }
61
62    sub_graph.node.extend(gates);
63    Ok(())
64}
65
66fn build_deadline_check_node(
67    source_name: &str,
68    trigger_input: &str,
69    gate_output: &str,
70    deadline_ns: i64,
71) -> NodeProto {
72    NodeProto {
73        op_type: DEADLINE_OP_TYPE.to_string(),
74        domain: DEADLINE_DOMAIN.to_string(),
75        name: format!("DeadlineCheck@{source_name}"),
76        input: vec![trigger_input.to_string()],
77        output: vec![gate_output.to_string()],
78        attribute: vec![AttributeProto {
79            name: ATTR_DEADLINE_NS.to_string(),
80            i: deadline_ns,
81            r#type: bb_ir::proto::onnx::attribute_proto::AttributeType::Int as i32,
82            ..Default::default()
83        }],
84        metadata_props: vec![
85            StringStringEntryProto {
86                key: "ai.bytesandbrains.deadline_source".to_string(),
87                value: source_name.to_string(),
88            },
89            // Mark the inserted gate as already-gated so a second
90            // pass run doesn't try to gate its own previous output.
91            StringStringEntryProto {
92                key: GATED_KEY.to_string(),
93                value: "true".to_string(),
94            },
95        ],
96        ..Default::default()
97    }
98}
99
100fn read_deadline(node: &NodeProto) -> Option<i64> {
101    node.attribute
102        .iter()
103        .find(|a| a.name == ATTR_DEADLINE_NS)
104        .map(|a| a.i)
105}
106
107fn metadata_value(node: &NodeProto, key: &str) -> Option<String> {
108    node.metadata_props
109        .iter()
110        .find(|p| p.key == key)
111        .map(|p| p.value.clone())
112}
113
114fn set_metadata(props: &mut Vec<StringStringEntryProto>, key: &str, value: &str) {
115    if let Some(existing) = props.iter_mut().find(|p| p.key == key) {
116        existing.value = value.to_string();
117        return;
118    }
119    props.push(StringStringEntryProto {
120        key: key.to_string(),
121        value: value.to_string(),
122    });
123}
124
125// ──── inventory-published GateContract ─────────────────
126
127/// Insertion contract for the deadline gate. Every node carrying
128/// `deadline_ns` must have the sibling `__trigger_deadline` input
129/// slot AND a matching `DeadlineCheck` writing to that slot —
130/// presence-only checks would miss partial runs.
131struct DeadlineCheckContract;
132
133impl crate::gate_contract::GateContract for DeadlineCheckContract {
134    fn name(&self) -> &'static str {
135        "DeadlineCheck"
136    }
137
138    fn assert_inserted(
139        &self,
140        sub_graph: &bb_ir::proto::onnx::GraphProto,
141    ) -> Result<(), CompileError> {
142        for node in &sub_graph.node {
143            // The DeadlineCheck gate itself carries `deadline_ns`
144            // (it propagates the value to the runtime check) AND
145            // is marked GATED — it never needs a sibling trigger
146            // slot of its own. Skip it.
147            if node.op_type == DEADLINE_OP_TYPE || metadata_value(node, GATED_KEY).is_some() {
148                continue;
149            }
150            if node.attribute.iter().any(|a| a.name == ATTR_DEADLINE_NS)
151                && !node
152                    .input
153                    .iter()
154                    .any(|i| i.ends_with(TRIGGER_DEADLINE_SUFFIX))
155            {
156                return Err(CompileError::RuntimeIncomplete {
157                    missing: format!(
158                        "DeadlineCheck not inserted upstream of `{}` (no sibling `{TRIGGER_DEADLINE_SUFFIX}` input)",
159                        node.name,
160                    ),
161                });
162            }
163        }
164        Ok(())
165    }
166}
167
168static DEADLINE_CHECK_CONTRACT: DeadlineCheckContract = DeadlineCheckContract;
169
170bb_ir::registry::inventory::submit! {
171    crate::gate_contract::GateContractRegistration {
172        contract: &DEADLINE_CHECK_CONTRACT,
173    }
174}
175