Skip to main content

bb_ops/syscalls/gates/
dedup_rx.rs

1//! `DedupGateRx` syscall op - gate inserted by the compiler pass
2//! `bb-compiler/src/insert_dedup_gate_rx.rs` at the head of the RX
3//! gate chain. Consults [`bb_runtime::framework::InboundDedup`] and
4//! drops envelopes whose payload hash has been seen recently.
5//!
6//! The hash is computed from the input value's wire bytes (via
7//! `SlotValue::to_wire_bytes`) so that the same payload from the
8//! same peer collapses to a single delivery.
9
10use bb_ir::proto::onnx::NodeProto;
11use bb_runtime::atomic::DispatchResult;
12use bb_runtime::bus::OpError;
13use bb_runtime::runtime::RuntimeResourceRef;
14use bb_runtime::slot_value::SlotValue;
15
16/// Marker struct for `register_syscall::<DedupGateRxOp>`.
17pub struct DedupGateRxOp;
18
19/// `(domain, op_type)` registration key.
20pub const DOMAIN: &str = "ai.bytesandbrains.syscall";
21/// Op type name.
22pub const OP_TYPE: &str = "DedupGateRx";
23
24/// Invoke fn - hashes the input's wire bytes, records the hash in
25/// `InboundDedup`, and forwards the input on first-arrival. Drops
26/// with a `duplicate` reason on a repeat.
27pub fn invoke(
28    _node: &NodeProto,
29    inputs: &[(&str, &dyn SlotValue)],
30    ctx: &mut RuntimeResourceRef<'_>,
31) -> Result<DispatchResult, OpError> {
32    let (_, value) = inputs.first().ok_or_else(|| OpError {
33        detail: "DedupGateRx requires one input".into(),
34        ..Default::default()
35    })?;
36
37    let bytes = value.to_wire_bytes().map_err(|e| OpError {
38        kind: bb_runtime::bus::OpErrorKind::ExecutionFailed,
39        reason: "wire_encode_failed",
40        detail: format!("DedupGateRx: input wire encode failed: {e}"),
41    })?;
42    let hash = fnv1a_64(&bytes);
43    if ctx.net.dedup.record(hash) {
44        return Err(OpError {
45            detail: "DedupGateRx dropped envelope: reason=duplicate".into(),
46            ..Default::default()
47        });
48    }
49
50    Ok(DispatchResult::Immediate(vec![(
51        "value".to_string(),
52        value.clone_boxed(),
53    )]))
54}
55
56/// FNV-1a 64-bit hash. Mirrors the wire-type hash in
57/// `bb_ir::wire::compute_wire_hash` so dedup behavior stays
58/// predictable across versions.
59fn fnv1a_64(bytes: &[u8]) -> u64 {
60    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
61    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
62    let mut hash = FNV_OFFSET;
63    for b in bytes {
64        hash ^= *b as u64;
65        hash = hash.wrapping_mul(FNV_PRIME);
66    }
67    hash
68}
69
70
71use bb_runtime::registry::OpRegistration as _BbOpsSyscallReg;
72
73inventory::submit! {
74    _BbOpsSyscallReg {
75        domain: DOMAIN,
76        op_type: OP_TYPE,
77        invoke,
78        kind: bb_runtime::registry::RegistrationKind::Syscall,
79    }
80}