bb_runtime/ingress.rs
1//! Lock-free MPMC ingress queue.
2//!
3//! External tasks (transport, host invocations, off-thread
4//! completions) push `IngressEvent`s onto the queue; the engine
5//! drains them on its next poll. Lock-free via `concurrent-queue`
6//! v2; the engine sleeps on an `AtomicWaker` until a producer
7//! wakes it.
8//!
9//! `Arc<IngressQueue>` is shared between the engine and any number
10//! of external producer tasks running on different threads.
11//!
12//! Per ENGINE.md §2.2 + §16: the queue is BOUNDED with default
13//! capacity `bus_capacity * 4` (= 4096 when bus_capacity uses the
14//! spec default of 1024). On overflow, `push` returns
15//! `Err(IngressEvent)` so the transport adapter can choose to
16//! retry, drop with a metric, or escalate as back-pressure to its
17//! upstream. The `dropped_overflow` counter tracks total overflow
18//! drops surfaced via `dropped_overflow()`.
19
20use std::ops::Deref;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::Waker;
24
25use atomic_waker::AtomicWaker;
26use concurrent_queue::{ConcurrentQueue, PushError};
27
28use crate::bus::{AppIngressErrorKind, AppIngressSource};
29use crate::ids::CommandId;
30
31/// Per-`fail()` detail-string hard cap. Truncated rather than rejected
32/// so the host's `Display`-rendered failure message always lands,
33/// even when oversized.
34pub const COMPLETION_DETAIL_CAP: usize = 4 * 1024;
35
36/// Default bus capacity per ENGINE.md §16; the ingress queue size
37/// defaults to 4× this value.
38const DEFAULT_BUS_CAPACITY: usize = 1024;
39
40/// Default ingress capacity: `bus_capacity * 4` per ENGINE.md §2.2.
41pub const DEFAULT_INGRESS_CAPACITY: usize = DEFAULT_BUS_CAPACITY * 4;
42
43/// External-event variants pushed to the ingress queue per
44/// `docs/ENGINE.md` §6 entry points.
45#[derive(Debug)]
46pub enum IngressEvent {
47 /// Inbound wire envelope from the transport layer, attributed
48 /// to a source peer. The engine calls
49 /// `PeerGovernor::check_inbound(src_peer)` on ingress; blocked or
50 /// non-allowlisted peers are dropped before any slot is written,
51 /// surfacing as `EngineStep::PeerBlocked`.
52 EnvelopeFrom {
53 /// Peer the envelope arrived from.
54 src_peer: crate::ids::PeerId,
55 /// The envelope payload.
56 envelope: crate::envelope::WireEnvelope,
57 /// Transport-observed source address, when the adapter can
58 /// supply it (e.g. NAT-translated remote endpoint, dialer's
59 /// observed multiaddr). The receiver merges this into its
60 /// AddressBook entry for `src_peer` so reflexive-address
61 /// discovery composes with the sender-claimed
62 /// `envelope.src_peer_addresses` list. `None` means the
63 /// transport didn't surface an observed address.
64 src_observed_address: Option<crate::framework::Address>,
65 },
66
67 /// Host pushed an app event onto a Module input.
68 AppEvent {
69 /// Target Module's name.
70 module_name: String,
71 /// Module input port name.
72 input_name: String,
73 /// Encoded value payload.
74 value_bytes: Vec<u8>,
75 },
76
77 /// External timer maturity signal (used when an off-thread
78 /// scheduler drives the engine).
79 TimerMatured {
80 /// Maturity timestamp (nanoseconds).
81 at_ns: u64,
82 },
83
84 /// Explicit Module invocation from host.
85 Invoke {
86 /// Target Module's name.
87 module_name: String,
88 /// `(input_name, value_bytes)` pairs.
89 inputs: Vec<(String, Vec<u8>)>,
90 /// `ExecId` allocated by `Node::invoke` so the host can
91 /// correlate `EngineStep::AppEvent` / `OpCompleted` /
92 /// `AsyncSuspended` outputs back to the originating call.
93 exec_id: crate::ids::ExecId,
94 },
95
96 /// External (off-thread) async completion landing back at the
97 /// engine.
98 Completion {
99 /// The `CommandId` being fulfilled.
100 cmd_id: CommandId,
101 /// Encoded output payloads.
102 results: Vec<Vec<u8>>,
103 },
104
105 /// Async completion FAILURE landing back at the engine.
106 /// Distinct from `Completion`: `CompletionSink::fail` mints
107 /// this variant directly so `handle_completion_failed` can
108 /// route to the typed `OpFailed` surface — the host sees a
109 /// real error, not a success-bytes masquerade.
110 CompletionFailed {
111 /// The `CommandId` whose await failed.
112 cmd_id: CommandId,
113 /// Human-readable failure detail; the runtime wraps it
114 /// into `bus::OpError` on the engine side.
115 detail: String,
116 },
117
118 /// Transport-side send-outcome failure surfaced
119 /// by an adapter (libp2p, sim, etc.) when the network NAKed an
120 /// outbound envelope or its delivery deadline elapsed without
121 /// an ACK. Distinct from `CompletionFailed` (which covers
122 /// off-thread compute completion); this variant covers
123 /// transport-layer delivery failure.
124 SendFailed {
125 /// The wire request id of the failed outbound envelope.
126 wire_req_id: u64,
127 /// The destination peer that NAKed or timed out (raw
128 /// multihash bytes so the engine can reconstruct
129 /// `PeerId::from_bytes(&peer)`).
130 peer: Vec<u8>,
131 /// Stable diagnostic label (e.g. `"nak"`, `"timeout"`,
132 /// `"network_unreachable"`). Adapters pick from a fixed
133 /// vocabulary so consumers can match on the label.
134 reason: &'static str,
135 },
136
137 /// Off-thread application-ingress failure (currently only
138 /// `CompletionSink::complete` exceeding the per-completion result
139 /// cap). The engine drains this variant and publishes a matching
140 /// `InfraEvent::AppIngressError` on the bus so subscribers see the
141 /// rejection. The synchronous `Node::deliver_event` / `Node::invoke`
142 /// path publishes directly with `&mut bus` access; this variant is
143 /// the cross-thread bridge for sinks that don't hold a bus
144 /// reference. The Component observes an async-op timeout in place
145 /// of the dropped completion.
146 AppIngressError {
147 /// Which application-side entry point raised the failure.
148 source: AppIngressSource,
149 /// Bytes the boundary was asked to admit.
150 byte_count: usize,
151 /// Which failure mode fired.
152 kind: AppIngressErrorKind,
153 },
154}
155
156impl IngressEvent {
157 /// Construct an `EnvelopeFrom` for the in-process router common
158 /// case where the transport carries no NAT and the observed
159 /// address is the sender's PeerId-tagged multiaddr. Test buses
160 /// and the in-process router call this so observed-address
161 /// propagation exercises the same merge path as a real
162 /// transport's reflexive surface.
163 pub fn from_in_process(
164 src_peer: crate::ids::PeerId,
165 envelope: crate::envelope::WireEnvelope,
166 ) -> Self {
167 Self::EnvelopeFrom {
168 src_peer,
169 envelope,
170 src_observed_address: Some(crate::framework::Address::empty().p2p(src_peer)),
171 }
172 }
173}
174
175/// Lock-free MPMC ingress queue + waker. Multiple external
176/// producers may `push` concurrently; the engine's single consumer
177/// drains via `drain_all` on each poll cycle.
178pub struct IngressQueue {
179 queue: ConcurrentQueue<IngressEvent>,
180 waker: AtomicWaker,
181 dropped_overflow: AtomicU64,
182 /// Per-`CompletionSink::complete` result cap sourced from
183 /// `NodeConfig::max_completion_result_bytes` via
184 /// `apply_config_caps`. Defaults to `usize::MAX` (no cap) so
185 /// constructions outside the `Node::new` → `apply_config_caps`
186 /// path (test fixtures, snapshot reseed) behave like the
187 /// pre-cap world.
188 completion_result_cap: AtomicUsize,
189}
190
191impl IngressQueue {
192 /// Construct a fresh ingress queue with the default capacity
193 /// ([`DEFAULT_INGRESS_CAPACITY`]).
194 pub fn new() -> Self {
195 Self::with_capacity(DEFAULT_INGRESS_CAPACITY)
196 }
197
198 /// Construct a fresh ingress queue with the supplied bounded
199 /// capacity. Per ENGINE.md §2.2 the canonical sizing is
200 /// `bus_capacity * 4`; pass the host's chosen bus_capacity
201 /// multiplied by 4 to match.
202 pub fn with_capacity(capacity: usize) -> Self {
203 Self {
204 queue: ConcurrentQueue::bounded(capacity),
205 waker: AtomicWaker::new(),
206 dropped_overflow: AtomicU64::new(0),
207 completion_result_cap: AtomicUsize::new(usize::MAX),
208 }
209 }
210
211 /// Install the per-`CompletionSink::complete` result cap.
212 /// `Engine::apply_config_caps` calls this from
213 /// `NodeConfig::max_completion_result_bytes` so off-thread
214 /// completions see the configured cap without the sink needing a
215 /// reference to `NodeConfig`.
216 pub(crate) fn set_completion_result_cap(&self, cap: usize) {
217 self.completion_result_cap.store(cap, Ordering::Relaxed);
218 }
219
220 /// Per-`complete()` result-byte cap. Defaults to `usize::MAX`
221 /// when not configured; `apply_config_caps` reseeds it from
222 /// `NodeConfig::max_completion_result_bytes`.
223 pub fn completion_result_cap(&self) -> usize {
224 self.completion_result_cap.load(Ordering::Relaxed)
225 }
226
227 /// Push an event. On success returns `Ok(())` and wakes the
228 /// engine if it's sleeping. On a full queue the event comes
229 /// back in `Err(_)` and the `dropped_overflow` counter is
230 /// incremented; transport adapters decide whether to retry,
231 /// drop with a metric, or escalate as back-pressure. The
232 /// `IngressEvent` Err variant is large (carries a
233 /// `WireEnvelope` with multihash PeerIds); transport adapters
234 /// already box or re-queue, so the cost lives at the boundary.
235 #[allow(clippy::result_large_err)]
236 pub fn push(&self, event: IngressEvent) -> Result<(), IngressEvent> {
237 match self.queue.push(event) {
238 Ok(()) => {
239 self.waker.wake();
240 Ok(())
241 }
242 Err(PushError::Full(ev)) => {
243 self.dropped_overflow.fetch_add(1, Ordering::Relaxed);
244 Err(ev)
245 }
246 Err(PushError::Closed(ev)) => Err(ev),
247 }
248 }
249
250 /// Drain all available events. Called by the engine on each
251 /// poll cycle's ingress drain.
252 ///
253 /// Pre-reserves capacity for the bounded queue's full length so the
254 /// drain Vec grows once at construction, not in `O(log n)`
255 /// reallocations as events pop. The queue itself caps inflight at
256 /// `self.capacity()`; the drain is bounded by the same cap, so the
257 /// upfront reservation is the exact-fit answer.
258 pub fn drain_all(&self) -> Vec<IngressEvent> {
259 let mut out = Vec::with_capacity(self.queue.capacity().unwrap_or(0));
260 while let Ok(event) = self.queue.pop() {
261 out.push(event);
262 }
263 out
264 }
265
266 /// Register the engine's waker so future pushes can wake it.
267 pub fn register_waker(&self, waker: &Waker) {
268 self.waker.register(waker);
269 }
270
271 /// `true` when the queue currently holds no events.
272 pub fn is_empty(&self) -> bool {
273 self.queue.is_empty()
274 }
275
276 /// Approximate current queue depth. The underlying
277 /// `concurrent-queue` returns an approximate `len` for the
278 /// MPMC case; introspection callers should treat
279 /// this as a snapshot, not a real-time invariant.
280 pub fn len(&self) -> usize {
281 self.queue.len()
282 }
283
284 /// Bounded capacity supplied at construction. `concurrent-queue`
285 /// guarantees `Some(cap)` for bounded queues, so unwrapping is
286 /// safe for the framework's path that never builds an unbounded
287 /// ingress queue.
288 pub fn capacity(&self) -> usize {
289 self.queue.capacity().unwrap_or(usize::MAX)
290 }
291
292 /// Total events dropped due to the queue being full since this
293 /// queue was constructed. Telemetry hook for transport adapters
294 /// + Node introspection.
295 pub fn dropped_overflow(&self) -> u64 {
296 self.dropped_overflow.load(Ordering::Relaxed)
297 }
298}
299
300impl Default for IngressQueue {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306/// Cheap-clone handle to the shared [`IngressQueue`] surfaced by
307/// [`crate::node::Node::ingress_handle`].
308/// Behaves identically to `Arc<IngressQueue>` via `Deref` so
309/// callers can `.push(IngressEvent::...)` directly. The newtype
310/// wrapper isolates the public API from the underlying smart-pointer
311/// choice.
312#[derive(Clone)]
313pub struct IngressQueueRef(Arc<IngressQueue>);
314
315impl IngressQueueRef {
316 /// Wrap an existing `Arc<IngressQueue>`. Used by `Node` after
317 /// borrowing from the inner engine.
318 pub fn new(queue: Arc<IngressQueue>) -> Self {
319 Self(queue)
320 }
321}
322
323impl IngressQueueRef {
324 /// Borrow the underlying `Arc<IngressQueue>`. Used by transport
325 /// adapters and in-process test buses that need to share the
326 /// queue across threads — both pin a per-Node queue handle and
327 /// push events as the transport receives them.
328 pub fn arc(&self) -> &Arc<IngressQueue> {
329 &self.0
330 }
331}
332
333impl Deref for IngressQueueRef {
334 type Target = IngressQueue;
335
336 fn deref(&self) -> &Self::Target {
337 &self.0
338 }
339}
340
341impl std::fmt::Debug for IngressQueueRef {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 f.debug_struct("IngressQueueRef")
344 .field("len", &self.len())
345 .field("dropped_overflow", &self.dropped_overflow())
346 .finish()
347 }
348}
349