bb_ops/syscalls/coordination/
mod.rs1pub mod deadline_check;
9
10use bb_runtime::atomic::DispatchResult;
11use bb_runtime::bus::OpError;
12
13pub fn link_force() {
15 use std::hint::black_box;
16 black_box(invoke_limit_acquire as usize);
17 black_box(invoke_limit_release as usize);
18 black_box(invoke_any as usize);
19 black_box(invoke_gate as usize);
20 black_box(invoke_serialize_enqueue as usize);
21 black_box(invoke_serialize_dequeue as usize);
22 black_box(invoke_correlate_tag as usize);
23 black_box(invoke_hold_stash as usize);
24 black_box(invoke_hold_flush as usize);
25 black_box(deadline_check::invoke as usize);
26}
27use bb_ir::proto::onnx::NodeProto;
28use bb_runtime::runtime::RuntimeResourceRef;
29use bb_runtime::slot_value::SlotValue;
30use bb_runtime::syscall::values::{BytesValue, CorrelationTokenValue, TriggerValue};
31
32const DOMAIN: &str = "ai.bytesandbrains.syscall";
33
34pub struct LimitAcquireOp;
38
39pub fn invoke_limit_acquire(
41 node: &NodeProto,
42 _inputs: &[(&str, &dyn SlotValue)],
43 ctx: &mut RuntimeResourceRef<'_>,
44) -> Result<DispatchResult, OpError> {
45 let name = node
46 .attribute
47 .iter()
48 .find(|a| a.name == "name")
49 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
50 .unwrap_or_default();
51 let n = node
52 .attribute
53 .iter()
54 .find(|a| a.name == "n")
55 .map(|a| a.i as u32)
56 .unwrap_or(1);
57 if ctx.peers.gate.acquire(&name, n) {
58 Ok(DispatchResult::Immediate(vec![(
59 "trigger".to_string(),
60 Box::new(TriggerValue),
61 )]))
62 } else {
63 Ok(DispatchResult::Immediate(vec![]))
64 }
65}
66
67pub struct LimitReleaseOp;
69
70pub fn invoke_limit_release(
72 node: &NodeProto,
73 _inputs: &[(&str, &dyn SlotValue)],
74 ctx: &mut RuntimeResourceRef<'_>,
75) -> Result<DispatchResult, OpError> {
76 let name = node
77 .attribute
78 .iter()
79 .find(|a| a.name == "name")
80 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
81 .unwrap_or_default();
82 ctx.peers.gate.release(&name);
83 Ok(DispatchResult::Immediate(vec![]))
84}
85
86pub struct AnyOp;
90
91pub fn invoke_any(
97 node: &NodeProto,
98 inputs: &[(&str, &dyn SlotValue)],
99 ctx: &mut RuntimeResourceRef<'_>,
100) -> Result<DispatchResult, OpError> {
101 let Some((_, first)) = inputs.first() else {
102 return Ok(DispatchResult::Immediate(vec![]));
103 };
104 let group = node
105 .attribute
106 .iter()
107 .find(|a| a.name == "group")
108 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
109 .unwrap_or_default();
110 if !group.is_empty() && !ctx.syscall.any_fired_groups.insert(group) {
113 return Ok(DispatchResult::Immediate(vec![]));
115 }
116 Ok(DispatchResult::Immediate(vec![(
117 "value".to_string(),
118 first.clone_boxed(),
119 )]))
120}
121
122pub struct GateOp;
126
127pub fn invoke_gate(
131 _node: &NodeProto,
132 inputs: &[(&str, &dyn SlotValue)],
133 _ctx: &mut RuntimeResourceRef<'_>,
134) -> Result<DispatchResult, OpError> {
135 let Some((_, value)) = inputs.first() else {
136 return Err(OpError {
137 detail: "Gate requires value input".to_string(),
138 ..Default::default()
139 });
140 };
141 Ok(DispatchResult::Immediate(vec![(
142 "value".to_string(),
143 value.clone_boxed(),
144 )]))
145}
146
147pub struct SerializeEnqueueOp;
151
152pub fn invoke_serialize_enqueue(
154 node: &NodeProto,
155 inputs: &[(&str, &dyn SlotValue)],
156 ctx: &mut RuntimeResourceRef<'_>,
157) -> Result<DispatchResult, OpError> {
158 let queue = node
159 .attribute
160 .iter()
161 .find(|a| a.name == "queue")
162 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
163 .unwrap_or_default();
164 let bytes = crate::syscalls::first_input_optional_bytes("Serialize.Enqueue", inputs)?
165 .unwrap_or_default();
166 ctx.syscall.serialize_queue.enqueue(&queue, bytes);
167 Ok(DispatchResult::Immediate(vec![(
168 "trigger".to_string(),
169 Box::new(TriggerValue),
170 )]))
171}
172
173pub struct SerializeDequeueOp;
175
176pub fn invoke_serialize_dequeue(
178 node: &NodeProto,
179 _inputs: &[(&str, &dyn SlotValue)],
180 ctx: &mut RuntimeResourceRef<'_>,
181) -> Result<DispatchResult, OpError> {
182 let queue = node
183 .attribute
184 .iter()
185 .find(|a| a.name == "queue")
186 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
187 .unwrap_or_default();
188 let Some(bytes) = ctx.syscall.serialize_queue.dequeue(&queue) else {
189 return Ok(DispatchResult::Immediate(vec![]));
190 };
191 Ok(DispatchResult::Immediate(vec![(
192 "value".to_string(),
193 Box::new(BytesValue(bytes)),
194 )]))
195}
196
197pub struct CorrelateTagOp;
201
202pub fn invoke_correlate_tag(
204 _node: &NodeProto,
205 _inputs: &[(&str, &dyn SlotValue)],
206 ctx: &mut RuntimeResourceRef<'_>,
207) -> Result<DispatchResult, OpError> {
208 let token = ctx.net.requests.mint_token();
209 Ok(DispatchResult::Immediate(vec![(
210 "token".to_string(),
211 Box::new(CorrelationTokenValue(token.as_u64())),
212 )]))
213}
214
215pub struct HoldStashOp;
219
220pub fn invoke_hold_stash(
222 node: &NodeProto,
223 inputs: &[(&str, &dyn SlotValue)],
224 ctx: &mut RuntimeResourceRef<'_>,
225) -> Result<DispatchResult, OpError> {
226 let slot = node
227 .attribute
228 .iter()
229 .find(|a| a.name == "slot")
230 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
231 .unwrap_or_default();
232 let bytes =
233 crate::syscalls::first_input_optional_bytes("Hold.Stash", inputs)?.unwrap_or_default();
234 ctx.syscall.hold_table.stash(&slot, bytes);
235 Ok(DispatchResult::Immediate(vec![]))
236}
237
238pub struct HoldFlushOp;
240
241pub fn invoke_hold_flush(
243 node: &NodeProto,
244 _inputs: &[(&str, &dyn SlotValue)],
245 ctx: &mut RuntimeResourceRef<'_>,
246) -> Result<DispatchResult, OpError> {
247 let slot = node
248 .attribute
249 .iter()
250 .find(|a| a.name == "slot")
251 .map(|a| String::from_utf8_lossy(&a.s).into_owned())
252 .unwrap_or_default();
253 let Some(bytes) = ctx.syscall.hold_table.flush(&slot) else {
254 return Ok(DispatchResult::Immediate(vec![]));
255 };
256 Ok(DispatchResult::Immediate(vec![(
257 "value".to_string(),
258 Box::new(BytesValue(bytes)),
259 )]))
260}
261
262#[cfg(test)]
263#[path = "tests.rs"]
264mod tests;
265
266use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
267
268inventory::submit! {
269 _BbOpsSyscallReg {
270 domain: DOMAIN,
271 op_type: "Limit.Acquire",
272 invoke: invoke_limit_acquire,
273 kind: bb_runtime::registry::RegistrationKind::Syscall,
274 }
275}
276
277inventory::submit! {
278 _BbOpsSyscallReg {
279 domain: DOMAIN,
280 op_type: "Limit.Release",
281 invoke: invoke_limit_release,
282 kind: bb_runtime::registry::RegistrationKind::Syscall,
283 }
284}
285
286inventory::submit! {
287 _BbOpsSyscallReg {
288 domain: DOMAIN,
289 op_type: "Any",
290 invoke: invoke_any,
291 kind: bb_runtime::registry::RegistrationKind::Syscall,
292 }
293}
294
295inventory::submit! {
296 _BbOpsSyscallReg {
297 domain: DOMAIN,
298 op_type: "Gate",
299 invoke: invoke_gate,
300 kind: bb_runtime::registry::RegistrationKind::Syscall,
301 }
302}
303
304inventory::submit! {
305 _BbOpsSyscallReg {
306 domain: DOMAIN,
307 op_type: "Serialize.Enqueue",
308 invoke: invoke_serialize_enqueue,
309 kind: bb_runtime::registry::RegistrationKind::Syscall,
310 }
311}
312
313inventory::submit! {
314 _BbOpsSyscallReg {
315 domain: DOMAIN,
316 op_type: "Serialize.Dequeue",
317 invoke: invoke_serialize_dequeue,
318 kind: bb_runtime::registry::RegistrationKind::Syscall,
319 }
320}
321
322inventory::submit! {
323 _BbOpsSyscallReg {
324 domain: DOMAIN,
325 op_type: "CorrelateTag",
326 invoke: invoke_correlate_tag,
327 kind: bb_runtime::registry::RegistrationKind::Syscall,
328 }
329}
330
331inventory::submit! {
332 _BbOpsSyscallReg {
333 domain: DOMAIN,
334 op_type: "Hold.Stash",
335 invoke: invoke_hold_stash,
336 kind: bb_runtime::registry::RegistrationKind::Syscall,
337 }
338}
339
340inventory::submit! {
341 _BbOpsSyscallReg {
342 domain: DOMAIN,
343 op_type: "Hold.Flush",
344 invoke: invoke_hold_flush,
345 kind: bb_runtime::registry::RegistrationKind::Syscall,
346 }
347}