Skip to main content

bb_ops/syscalls/telemetry/
mod.rs

1//! App-emit + telemetry ops - AppEmit, AppNotify, Record,
2//! IncrMetric.
3//!
4//! `AppEmit` / `AppNotify` push onto `ctx.syscall.pending_app_events`;
5//! the Engine drains them into `EngineStep::AppEvent`.
6//! `Record` writes to `ctx.syscall.record_buffer`; `IncrMetric` bumps
7//! `ctx.syscall.counters`.
8
9use bb_ir::proto::onnx::NodeProto;
10use bb_runtime::atomic::DispatchResult;
11use bb_runtime::bus::{AppEvent, OpError, OpErrorKind};
12use bb_runtime::runtime::RuntimeResourceRef;
13use bb_runtime::slot_value::SlotValue;
14
15const DOMAIN: &str = "ai.bytesandbrains.syscall";
16
17fn read_name(node: &NodeProto) -> String {
18    node.attribute
19        .iter()
20        .find(|a| a.name == "name")
21        .map(|a| String::from_utf8_lossy(&a.s).into_owned())
22        .unwrap_or_default()
23}
24
25/// Marker struct for dispatch_table TypeId keying.
26pub struct AppEmitOp;
27
28/// `AppEmit(value, name: string) → Sink`. Expects a `BytesValue`
29/// input - the emitter is a byte-level surface, so callers wrap
30/// their payload in `BytesValue` upstream.
31///
32/// The user-supplied `name` is validated against the framework's
33/// reserved-topic prefixes (`bb.`, `ai.bytesandbrains.`) via
34/// [`AppEvent::emit`] — a collision surfaces as
35/// `OpError::BadInput` so the engine doesn't ferry the impersonating
36/// publish to subscribers.
37pub fn invoke_app_emit(
38    node: &NodeProto,
39    inputs: &[(&str, &dyn SlotValue)],
40    ctx: &mut RuntimeResourceRef<'_>,
41) -> Result<DispatchResult, OpError> {
42    let name = read_name(node);
43    let value_bytes =
44        crate::syscalls::first_input_optional_bytes("AppEmit", inputs)?.unwrap_or_default();
45    let event = AppEvent::emit(name, value_bytes).map_err(|e| OpError {
46        kind: OpErrorKind::BadInput,
47        reason: "reserved_topic_prefix",
48        detail: e.to_string(),
49    })?;
50    ctx.syscall.pending_app_events.push(event);
51    Ok(DispatchResult::Immediate(vec![]))
52}
53
54/// Marker struct for dispatch_table TypeId keying.
55pub struct AppNotifyOp;
56
57/// `AppNotify(trigger, name: string) → Sink`. See [`invoke_app_emit`]
58/// for the reserved-prefix validation rule that applies here too.
59pub fn invoke_app_notify(
60    node: &NodeProto,
61    _inputs: &[(&str, &dyn SlotValue)],
62    ctx: &mut RuntimeResourceRef<'_>,
63) -> Result<DispatchResult, OpError> {
64    let name = read_name(node);
65    let event = AppEvent::notify(name).map_err(|e| OpError {
66        kind: OpErrorKind::BadInput,
67        reason: "reserved_topic_prefix",
68        detail: e.to_string(),
69    })?;
70    ctx.syscall.pending_app_events.push(event);
71    Ok(DispatchResult::Immediate(vec![]))
72}
73
74/// Marker struct for dispatch_table TypeId keying.
75pub struct RecordOp;
76
77/// `Record(value, name: string) → Sink`. Expects a `BytesValue`
78/// input - the record buffer stores byte payloads.
79pub fn invoke_record(
80    node: &NodeProto,
81    inputs: &[(&str, &dyn SlotValue)],
82    ctx: &mut RuntimeResourceRef<'_>,
83) -> Result<DispatchResult, OpError> {
84    let name = read_name(node);
85    let bytes = crate::syscalls::first_input_optional_bytes("Record", inputs)?.unwrap_or_default();
86    ctx.syscall.record_buffer.record(&name, bytes);
87    Ok(DispatchResult::Immediate(vec![]))
88}
89
90/// Marker struct for dispatch_table TypeId keying.
91pub struct IncrMetricOp;
92
93/// `IncrMetric(trigger, name: string, delta: int) → Sink`.
94pub fn invoke_incr_metric(
95    node: &NodeProto,
96    _inputs: &[(&str, &dyn SlotValue)],
97    ctx: &mut RuntimeResourceRef<'_>,
98) -> Result<DispatchResult, OpError> {
99    let name = read_name(node);
100    let delta = node
101        .attribute
102        .iter()
103        .find(|a| a.name == "delta")
104        .map(|a| a.i)
105        .unwrap_or(1) as u64;
106    *ctx.syscall.counters.entry(name).or_insert(0) += delta;
107    Ok(DispatchResult::Immediate(vec![]))
108}
109
110/// Linker-anchor - see `bb_ops::link_force` for details.
111pub fn link_force() {
112    use std::hint::black_box;
113    black_box(invoke_app_emit as usize);
114    black_box(invoke_app_notify as usize);
115    black_box(invoke_record as usize);
116    black_box(invoke_incr_metric as usize);
117}
118
119#[cfg(test)]
120#[path = "tests.rs"]
121mod tests;
122
123use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
124
125inventory::submit! {
126    _BbOpsSyscallReg {
127        domain: DOMAIN,
128        op_type: "AppEmit",
129        invoke: invoke_app_emit,
130        kind: bb_runtime::registry::RegistrationKind::Syscall,
131    }
132}
133
134inventory::submit! {
135    _BbOpsSyscallReg {
136        domain: DOMAIN,
137        op_type: "AppNotify",
138        invoke: invoke_app_notify,
139        kind: bb_runtime::registry::RegistrationKind::Syscall,
140    }
141}
142
143inventory::submit! {
144    _BbOpsSyscallReg {
145        domain: DOMAIN,
146        op_type: "Record",
147        invoke: invoke_record,
148        kind: bb_runtime::registry::RegistrationKind::Syscall,
149    }
150}
151
152inventory::submit! {
153    _BbOpsSyscallReg {
154        domain: DOMAIN,
155        op_type: "IncrMetric",
156        invoke: invoke_incr_metric,
157        kind: bb_runtime::registry::RegistrationKind::Syscall,
158    }
159}