Skip to main content

bb_ops/syscalls/coordination/
mod.rs

1//! Coordination ops - Limit, Any, Gate, Serialize, CorrelateTag,
2//! Hold, DeadlineCheck. + .
3//!
4//! `DeadlineCheck` lives in its own sub-module so the compiler
5//! pass that inserts it can re-export `OP_TYPE` + attribute-name
6//! constants from one place.
7
8pub mod deadline_check;
9
10use bb_runtime::atomic::DispatchResult;
11use bb_runtime::bus::OpError;
12
13/// Linker-anchor - see `bb_ops::link_force` for details.
14pub fn link_force() {
15    use std::hint::black_box;
16    black_box(invoke_limit_acquire as usize);
17    black_box(invoke_limit_release as usize);
18    black_box(invoke_any as usize);
19    black_box(invoke_gate as usize);
20    black_box(invoke_serialize_enqueue as usize);
21    black_box(invoke_serialize_dequeue as usize);
22    black_box(invoke_correlate_tag as usize);
23    black_box(invoke_hold_stash as usize);
24    black_box(invoke_hold_flush as usize);
25    black_box(deadline_check::invoke as usize);
26}
27use bb_ir::proto::onnx::NodeProto;
28use bb_runtime::runtime::RuntimeResourceRef;
29use bb_runtime::slot_value::SlotValue;
30use bb_runtime::syscall::values::{BytesValue, CorrelationTokenValue, TriggerValue};
31
32const DOMAIN: &str = "ai.bytesandbrains.syscall";
33
34// --- Limit.Acquire / Limit.Release ---------------------------------
35
36/// Marker struct for dispatch_table TypeId keying.
37pub struct LimitAcquireOp;
38
39/// `Limit.Acquire(trigger, name: string, n: int) → Trigger | Sink`.
40pub fn invoke_limit_acquire(
41    node: &NodeProto,
42    _inputs: &[(&str, &dyn SlotValue)],
43    ctx: &mut RuntimeResourceRef<'_>,
44) -> Result<DispatchResult, OpError> {
45    let name = node
46        .attribute
47        .iter()
48        .find(|a| a.name == "name")
49        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
50        .unwrap_or_default();
51    let n = node
52        .attribute
53        .iter()
54        .find(|a| a.name == "n")
55        .map(|a| a.i as u32)
56        .unwrap_or(1);
57    if ctx.peers.gate.acquire(&name, n) {
58        Ok(DispatchResult::Immediate(vec![(
59            "trigger".to_string(),
60            Box::new(TriggerValue),
61        )]))
62    } else {
63        Ok(DispatchResult::Immediate(vec![]))
64    }
65}
66
67/// Marker struct for dispatch_table TypeId keying.
68pub struct LimitReleaseOp;
69
70/// `Limit.Release(trigger, name: string) → Sink`.
71pub fn invoke_limit_release(
72    node: &NodeProto,
73    _inputs: &[(&str, &dyn SlotValue)],
74    ctx: &mut RuntimeResourceRef<'_>,
75) -> Result<DispatchResult, OpError> {
76    let name = node
77        .attribute
78        .iter()
79        .find(|a| a.name == "name")
80        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
81        .unwrap_or_default();
82    ctx.peers.gate.release(&name);
83    Ok(DispatchResult::Immediate(vec![]))
84}
85
86// --- Any -----------------------------------------------------------
87
88/// Marker struct for dispatch_table TypeId keying.
89pub struct AnyOp;
90
91/// `Any(inputs: variadic, group: string) → value` per IR_AND_DSL.md
92/// §5a. First-arrival semantics: the first input to arrive in a
93/// group emits its value; subsequent arrivals within the same
94/// `group` are absorbed (Immediate `vec![]`) so downstream
95/// consumers don't re-fire.
96pub fn invoke_any(
97    node: &NodeProto,
98    inputs: &[(&str, &dyn SlotValue)],
99    ctx: &mut RuntimeResourceRef<'_>,
100) -> Result<DispatchResult, OpError> {
101    let Some((_, first)) = inputs.first() else {
102        return Ok(DispatchResult::Immediate(vec![]));
103    };
104    let group = node
105        .attribute
106        .iter()
107        .find(|a| a.name == "group")
108        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
109        .unwrap_or_default();
110    // Empty `group` → always fire (no latch). Non-empty → gate on
111    // the per-group latch so a group fires at most once per Node.
112    if !group.is_empty() && !ctx.syscall.any_fired_groups.insert(group) {
113        // Already fired this group - absorb.
114        return Ok(DispatchResult::Immediate(vec![]));
115    }
116    Ok(DispatchResult::Immediate(vec![(
117        "value".to_string(),
118        first.clone_boxed(),
119    )]))
120}
121
122// --- Gate ----------------------------------------------------------
123
124/// Marker struct for dispatch_table TypeId keying.
125pub struct GateOp;
126
127/// `Gate(value, trigger) → value`. Releases the value once trigger
128/// arrives (both inputs are required by all_inputs_ready before
129/// the engine invokes us).
130pub fn invoke_gate(
131    _node: &NodeProto,
132    inputs: &[(&str, &dyn SlotValue)],
133    _ctx: &mut RuntimeResourceRef<'_>,
134) -> Result<DispatchResult, OpError> {
135    let Some((_, value)) = inputs.first() else {
136        return Err(OpError {
137            detail: "Gate requires value input".to_string(),
138            ..Default::default()
139        });
140    };
141    Ok(DispatchResult::Immediate(vec![(
142        "value".to_string(),
143        value.clone_boxed(),
144    )]))
145}
146
147// --- Serialize.Enqueue / Dequeue -----------------------------------
148
149/// Marker struct for dispatch_table TypeId keying.
150pub struct SerializeEnqueueOp;
151
152/// `Serialize.Enqueue(value, queue: string) → Trigger`.
153pub fn invoke_serialize_enqueue(
154    node: &NodeProto,
155    inputs: &[(&str, &dyn SlotValue)],
156    ctx: &mut RuntimeResourceRef<'_>,
157) -> Result<DispatchResult, OpError> {
158    let queue = node
159        .attribute
160        .iter()
161        .find(|a| a.name == "queue")
162        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
163        .unwrap_or_default();
164    let bytes = crate::syscalls::first_input_optional_bytes("Serialize.Enqueue", inputs)?
165        .unwrap_or_default();
166    ctx.syscall.serialize_queue.enqueue(&queue, bytes);
167    Ok(DispatchResult::Immediate(vec![(
168        "trigger".to_string(),
169        Box::new(TriggerValue),
170    )]))
171}
172
173/// Marker struct for dispatch_table TypeId keying.
174pub struct SerializeDequeueOp;
175
176/// `Serialize.Dequeue(trigger, queue: string) → value`.
177pub fn invoke_serialize_dequeue(
178    node: &NodeProto,
179    _inputs: &[(&str, &dyn SlotValue)],
180    ctx: &mut RuntimeResourceRef<'_>,
181) -> Result<DispatchResult, OpError> {
182    let queue = node
183        .attribute
184        .iter()
185        .find(|a| a.name == "queue")
186        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
187        .unwrap_or_default();
188    let Some(bytes) = ctx.syscall.serialize_queue.dequeue(&queue) else {
189        return Ok(DispatchResult::Immediate(vec![]));
190    };
191    Ok(DispatchResult::Immediate(vec![(
192        "value".to_string(),
193        Box::new(BytesValue(bytes)),
194    )]))
195}
196
197// --- CorrelateTag --------------------------------------------------
198
199/// Marker struct for dispatch_table TypeId keying.
200pub struct CorrelateTagOp;
201
202/// `CorrelateTag(trigger) → token`.
203pub fn invoke_correlate_tag(
204    _node: &NodeProto,
205    _inputs: &[(&str, &dyn SlotValue)],
206    ctx: &mut RuntimeResourceRef<'_>,
207) -> Result<DispatchResult, OpError> {
208    let token = ctx.net.requests.mint_token();
209    Ok(DispatchResult::Immediate(vec![(
210        "token".to_string(),
211        Box::new(CorrelationTokenValue(token.as_u64())),
212    )]))
213}
214
215// --- Hold.Stash / Hold.Flush ---------------------------------------
216
217/// Marker struct for dispatch_table TypeId keying.
218pub struct HoldStashOp;
219
220/// `Hold.Stash(value, slot: string) → Sink`.
221pub fn invoke_hold_stash(
222    node: &NodeProto,
223    inputs: &[(&str, &dyn SlotValue)],
224    ctx: &mut RuntimeResourceRef<'_>,
225) -> Result<DispatchResult, OpError> {
226    let slot = node
227        .attribute
228        .iter()
229        .find(|a| a.name == "slot")
230        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
231        .unwrap_or_default();
232    let bytes =
233        crate::syscalls::first_input_optional_bytes("Hold.Stash", inputs)?.unwrap_or_default();
234    ctx.syscall.hold_table.stash(&slot, bytes);
235    Ok(DispatchResult::Immediate(vec![]))
236}
237
238/// Marker struct for dispatch_table TypeId keying.
239pub struct HoldFlushOp;
240
241/// `Hold.Flush(trigger, slot: string) → value`.
242pub fn invoke_hold_flush(
243    node: &NodeProto,
244    _inputs: &[(&str, &dyn SlotValue)],
245    ctx: &mut RuntimeResourceRef<'_>,
246) -> Result<DispatchResult, OpError> {
247    let slot = node
248        .attribute
249        .iter()
250        .find(|a| a.name == "slot")
251        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
252        .unwrap_or_default();
253    let Some(bytes) = ctx.syscall.hold_table.flush(&slot) else {
254        return Ok(DispatchResult::Immediate(vec![]));
255    };
256    Ok(DispatchResult::Immediate(vec![(
257        "value".to_string(),
258        Box::new(BytesValue(bytes)),
259    )]))
260}
261
262#[cfg(test)]
263#[path = "tests.rs"]
264mod tests;
265
266use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
267
268inventory::submit! {
269    _BbOpsSyscallReg {
270        domain: DOMAIN,
271        op_type: "Limit.Acquire",
272        invoke: invoke_limit_acquire,
273        kind: bb_runtime::registry::RegistrationKind::Syscall,
274    }
275}
276
277inventory::submit! {
278    _BbOpsSyscallReg {
279        domain: DOMAIN,
280        op_type: "Limit.Release",
281        invoke: invoke_limit_release,
282        kind: bb_runtime::registry::RegistrationKind::Syscall,
283    }
284}
285
286inventory::submit! {
287    _BbOpsSyscallReg {
288        domain: DOMAIN,
289        op_type: "Any",
290        invoke: invoke_any,
291        kind: bb_runtime::registry::RegistrationKind::Syscall,
292    }
293}
294
295inventory::submit! {
296    _BbOpsSyscallReg {
297        domain: DOMAIN,
298        op_type: "Gate",
299        invoke: invoke_gate,
300        kind: bb_runtime::registry::RegistrationKind::Syscall,
301    }
302}
303
304inventory::submit! {
305    _BbOpsSyscallReg {
306        domain: DOMAIN,
307        op_type: "Serialize.Enqueue",
308        invoke: invoke_serialize_enqueue,
309        kind: bb_runtime::registry::RegistrationKind::Syscall,
310    }
311}
312
313inventory::submit! {
314    _BbOpsSyscallReg {
315        domain: DOMAIN,
316        op_type: "Serialize.Dequeue",
317        invoke: invoke_serialize_dequeue,
318        kind: bb_runtime::registry::RegistrationKind::Syscall,
319    }
320}
321
322inventory::submit! {
323    _BbOpsSyscallReg {
324        domain: DOMAIN,
325        op_type: "CorrelateTag",
326        invoke: invoke_correlate_tag,
327        kind: bb_runtime::registry::RegistrationKind::Syscall,
328    }
329}
330
331inventory::submit! {
332    _BbOpsSyscallReg {
333        domain: DOMAIN,
334        op_type: "Hold.Stash",
335        invoke: invoke_hold_stash,
336        kind: bb_runtime::registry::RegistrationKind::Syscall,
337    }
338}
339
340inventory::submit! {
341    _BbOpsSyscallReg {
342        domain: DOMAIN,
343        op_type: "Hold.Flush",
344        invoke: invoke_hold_flush,
345        kind: bb_runtime::registry::RegistrationKind::Syscall,
346    }
347}