Skip to main content

bb_runtime/
ingress.rs

1//! Lock-free MPMC ingress queue.
2//!
3//! External tasks (transport, host invocations, off-thread
4//! completions) push `IngressEvent`s onto the queue; the engine
5//! drains them on its next poll. Lock-free via `concurrent-queue`
6//! v2; the engine sleeps on an `AtomicWaker` until a producer
7//! wakes it.
8//!
9//! `Arc<IngressQueue>` is shared between the engine and any number
10//! of external producer tasks running on different threads.
11//!
12//! Per ENGINE.md §2.2 + §16: the queue is BOUNDED with default
13//! capacity `bus_capacity * 4` (= 4096 when bus_capacity uses the
14//! spec default of 1024). On overflow, `push` returns
15//! `Err(IngressEvent)` so the transport adapter can choose to
16//! retry, drop with a metric, or escalate as back-pressure to its
17//! upstream. The `dropped_overflow` counter tracks total overflow
18//! drops surfaced via `dropped_overflow()`.
19
20use std::ops::Deref;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::Waker;
24
25use atomic_waker::AtomicWaker;
26use concurrent_queue::{ConcurrentQueue, PushError};
27
28use crate::bus::{AppIngressErrorKind, AppIngressSource};
29use crate::ids::CommandId;
30
31/// Per-`fail()` detail-string hard cap. Truncated rather than rejected
32/// so the host's `Display`-rendered failure message always lands,
33/// even when oversized.
34pub const COMPLETION_DETAIL_CAP: usize = 4 * 1024;
35
36/// Default bus capacity per ENGINE.md §16; the ingress queue size
37/// defaults to 4× this value.
38const DEFAULT_BUS_CAPACITY: usize = 1024;
39
40/// Default ingress capacity: `bus_capacity * 4` per ENGINE.md §2.2.
41pub const DEFAULT_INGRESS_CAPACITY: usize = DEFAULT_BUS_CAPACITY * 4;
42
43/// External-event variants pushed to the ingress queue per
44/// `docs/ENGINE.md` §6 entry points.
45#[derive(Debug)]
46pub enum IngressEvent {
47    /// Inbound wire envelope from the transport layer, attributed
48    /// to a source peer. The engine calls
49    /// `PeerGovernor::check_inbound(src_peer)` on ingress; blocked or
50    /// non-allowlisted peers are dropped before any slot is written,
51    /// surfacing as `EngineStep::PeerBlocked`.
52    EnvelopeFrom {
53        /// Peer the envelope arrived from.
54        src_peer: crate::ids::PeerId,
55        /// The envelope payload.
56        envelope: crate::envelope::WireEnvelope,
57        /// Transport-observed source address, when the adapter can
58        /// supply it (e.g. NAT-translated remote endpoint, dialer's
59        /// observed multiaddr). The receiver merges this into its
60        /// AddressBook entry for `src_peer` so reflexive-address
61        /// discovery composes with the sender-claimed
62        /// `envelope.src_peer_addresses` list. `None` means the
63        /// transport didn't surface an observed address.
64        src_observed_address: Option<crate::framework::Address>,
65    },
66
67    /// Host pushed an app event onto a Module input.
68    AppEvent {
69        /// Target Module's name.
70        module_name: String,
71        /// Module input port name.
72        input_name: String,
73        /// Encoded value payload.
74        value_bytes: Vec<u8>,
75    },
76
77    /// External timer maturity signal (used when an off-thread
78    /// scheduler drives the engine).
79    TimerMatured {
80        /// Maturity timestamp (nanoseconds).
81        at_ns: u64,
82    },
83
84    /// Explicit Module invocation from host.
85    Invoke {
86        /// Target Module's name.
87        module_name: String,
88        /// `(input_name, value_bytes)` pairs.
89        inputs: Vec<(String, Vec<u8>)>,
90        /// `ExecId` allocated by `Node::invoke` so the host can
91        /// correlate `EngineStep::AppEvent` / `OpCompleted` /
92        /// `AsyncSuspended` outputs back to the originating call.
93        exec_id: crate::ids::ExecId,
94    },
95
96    /// External (off-thread) async completion landing back at the
97    /// engine.
98    Completion {
99        /// The `CommandId` being fulfilled.
100        cmd_id: CommandId,
101        /// Encoded output payloads.
102        results: Vec<Vec<u8>>,
103    },
104
105    /// Async completion FAILURE landing back at the engine.
106    /// Distinct from `Completion`: `CompletionSink::fail` mints
107    /// this variant directly so `handle_completion_failed` can
108    /// route to the typed `OpFailed` surface — the host sees a
109    /// real error, not a success-bytes masquerade.
110    CompletionFailed {
111        /// The `CommandId` whose await failed.
112        cmd_id: CommandId,
113        /// Human-readable failure detail; the runtime wraps it
114        /// into `bus::OpError` on the engine side.
115        detail: String,
116    },
117
118    /// Transport-side send-outcome failure surfaced
119    /// by an adapter (libp2p, sim, etc.) when the network NAKed an
120    /// outbound envelope or its delivery deadline elapsed without
121    /// an ACK. Distinct from `CompletionFailed` (which covers
122    /// off-thread compute completion); this variant covers
123    /// transport-layer delivery failure.
124    SendFailed {
125        /// The wire request id of the failed outbound envelope.
126        wire_req_id: u64,
127        /// The destination peer that NAKed or timed out (raw
128        /// multihash bytes so the engine can reconstruct
129        /// `PeerId::from_bytes(&peer)`).
130        peer: Vec<u8>,
131        /// Stable diagnostic label (e.g. `"nak"`, `"timeout"`,
132        /// `"network_unreachable"`). Adapters pick from a fixed
133        /// vocabulary so consumers can match on the label.
134        reason: &'static str,
135    },
136
137    /// Off-thread application-ingress failure (currently only
138    /// `CompletionSink::complete` exceeding the per-completion result
139    /// cap). The engine drains this variant and publishes a matching
140    /// `InfraEvent::AppIngressError` on the bus so subscribers see the
141    /// rejection. The synchronous `Node::deliver_event` / `Node::invoke`
142    /// path publishes directly with `&mut bus` access; this variant is
143    /// the cross-thread bridge for sinks that don't hold a bus
144    /// reference. The Component observes an async-op timeout in place
145    /// of the dropped completion.
146    AppIngressError {
147        /// Which application-side entry point raised the failure.
148        source: AppIngressSource,
149        /// Bytes the boundary was asked to admit.
150        byte_count: usize,
151        /// Which failure mode fired.
152        kind: AppIngressErrorKind,
153    },
154}
155
156impl IngressEvent {
157    /// Construct an `EnvelopeFrom` for the in-process router common
158    /// case where the transport carries no NAT and the observed
159    /// address is the sender's PeerId-tagged multiaddr. Test buses
160    /// and the in-process router call this so observed-address
161    /// propagation exercises the same merge path as a real
162    /// transport's reflexive surface.
163    pub fn from_in_process(
164        src_peer: crate::ids::PeerId,
165        envelope: crate::envelope::WireEnvelope,
166    ) -> Self {
167        Self::EnvelopeFrom {
168            src_peer,
169            envelope,
170            src_observed_address: Some(crate::framework::Address::empty().p2p(src_peer)),
171        }
172    }
173}
174
175/// Lock-free MPMC ingress queue + waker. Multiple external
176/// producers may `push` concurrently; the engine's single consumer
177/// drains via `drain_all` on each poll cycle.
178pub struct IngressQueue {
179    queue: ConcurrentQueue<IngressEvent>,
180    waker: AtomicWaker,
181    dropped_overflow: AtomicU64,
182    /// Per-`CompletionSink::complete` result cap sourced from
183    /// `NodeConfig::max_completion_result_bytes` via
184    /// `apply_config_caps`. Defaults to `usize::MAX` (no cap) so
185    /// constructions outside the `Node::new` → `apply_config_caps`
186    /// path (test fixtures, snapshot reseed) behave like the
187    /// pre-cap world.
188    completion_result_cap: AtomicUsize,
189}
190
191impl IngressQueue {
192    /// Construct a fresh ingress queue with the default capacity
193    /// ([`DEFAULT_INGRESS_CAPACITY`]).
194    pub fn new() -> Self {
195        Self::with_capacity(DEFAULT_INGRESS_CAPACITY)
196    }
197
198    /// Construct a fresh ingress queue with the supplied bounded
199    /// capacity. Per ENGINE.md §2.2 the canonical sizing is
200    /// `bus_capacity * 4`; pass the host's chosen bus_capacity
201    /// multiplied by 4 to match.
202    pub fn with_capacity(capacity: usize) -> Self {
203        Self {
204            queue: ConcurrentQueue::bounded(capacity),
205            waker: AtomicWaker::new(),
206            dropped_overflow: AtomicU64::new(0),
207            completion_result_cap: AtomicUsize::new(usize::MAX),
208        }
209    }
210
211    /// Install the per-`CompletionSink::complete` result cap.
212    /// `Engine::apply_config_caps` calls this from
213    /// `NodeConfig::max_completion_result_bytes` so off-thread
214    /// completions see the configured cap without the sink needing a
215    /// reference to `NodeConfig`.
216    pub(crate) fn set_completion_result_cap(&self, cap: usize) {
217        self.completion_result_cap.store(cap, Ordering::Relaxed);
218    }
219
220    /// Per-`complete()` result-byte cap. Defaults to `usize::MAX`
221    /// when not configured; `apply_config_caps` reseeds it from
222    /// `NodeConfig::max_completion_result_bytes`.
223    pub fn completion_result_cap(&self) -> usize {
224        self.completion_result_cap.load(Ordering::Relaxed)
225    }
226
227    /// Push an event. On success returns `Ok(())` and wakes the
228    /// engine if it's sleeping. On a full queue the event comes
229    /// back in `Err(_)` and the `dropped_overflow` counter is
230    /// incremented; transport adapters decide whether to retry,
231    /// drop with a metric, or escalate as back-pressure. The
232    /// `IngressEvent` Err variant is large (carries a
233    /// `WireEnvelope` with multihash PeerIds); transport adapters
234    /// already box or re-queue, so the cost lives at the boundary.
235    #[allow(clippy::result_large_err)]
236    pub fn push(&self, event: IngressEvent) -> Result<(), IngressEvent> {
237        match self.queue.push(event) {
238            Ok(()) => {
239                self.waker.wake();
240                Ok(())
241            }
242            Err(PushError::Full(ev)) => {
243                self.dropped_overflow.fetch_add(1, Ordering::Relaxed);
244                Err(ev)
245            }
246            Err(PushError::Closed(ev)) => Err(ev),
247        }
248    }
249
250    /// Drain all available events. Called by the engine on each
251    /// poll cycle's ingress drain.
252    ///
253    /// Pre-reserves capacity for the bounded queue's full length so the
254    /// drain Vec grows once at construction, not in `O(log n)`
255    /// reallocations as events pop. The queue itself caps inflight at
256    /// `self.capacity()`; the drain is bounded by the same cap, so the
257    /// upfront reservation is the exact-fit answer.
258    pub fn drain_all(&self) -> Vec<IngressEvent> {
259        let mut out = Vec::with_capacity(self.queue.capacity().unwrap_or(0));
260        while let Ok(event) = self.queue.pop() {
261            out.push(event);
262        }
263        out
264    }
265
266    /// Register the engine's waker so future pushes can wake it.
267    pub fn register_waker(&self, waker: &Waker) {
268        self.waker.register(waker);
269    }
270
271    /// `true` when the queue currently holds no events.
272    pub fn is_empty(&self) -> bool {
273        self.queue.is_empty()
274    }
275
276    /// Approximate current queue depth. The underlying
277    /// `concurrent-queue` returns an approximate `len` for the
278    /// MPMC case; introspection callers should treat
279    /// this as a snapshot, not a real-time invariant.
280    pub fn len(&self) -> usize {
281        self.queue.len()
282    }
283
284    /// Bounded capacity supplied at construction. `concurrent-queue`
285    /// guarantees `Some(cap)` for bounded queues, so unwrapping is
286    /// safe for the framework's path that never builds an unbounded
287    /// ingress queue.
288    pub fn capacity(&self) -> usize {
289        self.queue.capacity().unwrap_or(usize::MAX)
290    }
291
292    /// Total events dropped due to the queue being full since this
293    /// queue was constructed. Telemetry hook for transport adapters
294    /// + Node introspection.
295    pub fn dropped_overflow(&self) -> u64 {
296        self.dropped_overflow.load(Ordering::Relaxed)
297    }
298}
299
300impl Default for IngressQueue {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306/// Cheap-clone handle to the shared [`IngressQueue`] surfaced by
307/// [`crate::node::Node::ingress_handle`].
308/// Behaves identically to `Arc<IngressQueue>` via `Deref` so
309/// callers can `.push(IngressEvent::...)` directly. The newtype
310/// wrapper isolates the public API from the underlying smart-pointer
311/// choice.
312#[derive(Clone)]
313pub struct IngressQueueRef(Arc<IngressQueue>);
314
315impl IngressQueueRef {
316    /// Wrap an existing `Arc<IngressQueue>`. Used by `Node` after
317    /// borrowing from the inner engine.
318    pub fn new(queue: Arc<IngressQueue>) -> Self {
319        Self(queue)
320    }
321}
322
323impl IngressQueueRef {
324    /// Borrow the underlying `Arc<IngressQueue>`. Used by transport
325    /// adapters and in-process test buses that need to share the
326    /// queue across threads — both pin a per-Node queue handle and
327    /// push events as the transport receives them.
328    pub fn arc(&self) -> &Arc<IngressQueue> {
329        &self.0
330    }
331}
332
333impl Deref for IngressQueueRef {
334    type Target = IngressQueue;
335
336    fn deref(&self) -> &Self::Target {
337        &self.0
338    }
339}
340
341impl std::fmt::Debug for IngressQueueRef {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        f.debug_struct("IngressQueueRef")
344            .field("len", &self.len())
345            .field("dropped_overflow", &self.dropped_overflow())
346            .finish()
347    }
348}
349