bb_runtime/framework/outbound_queue.rs
1//! `OutboundQueue` - FIFO of wire envelopes ready to ship.
2//!
3//! Used by Phase 8 of the engine's poll cycle: drained on each
4//! cycle into `EngineStep::SendEnvelope` outputs. Carries an
5//! optional FIFO-drop cap (`NodeConfig.max_outbound_queue`); when
6//! a push would exceed the cap, the oldest envelope is evicted
7//! and a counter increments. Phase 8 reads + resets the counter to
8//! emit `EngineStep::OutboundDropped { count }`.
9
10use crate::envelope::{WireEnvelope, ENVELOPE_SCHEMA_VERSION};
11use std::collections::VecDeque;
12
13/// FIFO of wire envelopes ready to ship.
14#[derive(Default)]
15pub struct OutboundQueue {
16 queue: VecDeque<WireEnvelope>,
17 cap: Option<usize>,
18 dropped_since_last_drain: usize,
19}
20
21impl OutboundQueue {
22 /// Construct a fresh outbound queue with no cap.
23 pub fn new() -> Self {
24 Self::default()
25 }
26
27 /// Construct a fresh outbound queue with the given FIFO-drop
28 /// cap. `None` disables the cap.
29 pub fn with_cap(cap: Option<usize>) -> Self {
30 Self {
31 queue: VecDeque::new(),
32 cap,
33 dropped_since_last_drain: 0,
34 }
35 }
36
37 /// Set the FIFO-drop cap. `None` removes the cap.
38 pub fn set_cap(&mut self, cap: Option<usize>) {
39 self.cap = cap;
40 }
41
42 /// Push an envelope for shipment. If a cap is set and the
43 /// queue is at the cap, FIFO-evict the oldest entry and bump
44 /// the drop counter.
45 ///
46 /// Stamps `schema_version` here so the encode boundary can read
47 /// `&WireEnvelope` and call `encode_to_vec` without cloning the
48 /// payload to set the field defensively.
49 pub fn push(&mut self, mut env: WireEnvelope) {
50 if env.schema_version == 0 {
51 env.schema_version = ENVELOPE_SCHEMA_VERSION;
52 }
53 if let Some(cap) = self.cap {
54 while self.queue.len() >= cap {
55 self.queue.pop_front();
56 self.dropped_since_last_drain += 1;
57 }
58 }
59 self.queue.push_back(env);
60 }
61
62 /// Drain all queued envelopes. Called by Phase 8 of the poll
63 /// cycle.
64 pub fn drain_all(&mut self) -> Vec<WireEnvelope> {
65 self.queue.drain(..).collect()
66 }
67
68 /// Read + reset the count of FIFO-dropped envelopes since the
69 /// last call. Returns 0 when no drops occurred.
70 pub fn take_dropped_count(&mut self) -> usize {
71 std::mem::take(&mut self.dropped_since_last_drain)
72 }
73
74 /// Number of queued envelopes.
75 pub fn len(&self) -> usize {
76 self.queue.len()
77 }
78
79 /// Whether the queue is empty.
80 pub fn is_empty(&self) -> bool {
81 self.queue.is_empty()
82 }
83
84 /// Iterate queued envelopes for snapshot capture. The queue
85 /// is not consumed - Phase 8 still drains on the next poll.
86 /// .
87 pub fn iter_for_snapshot(&self) -> impl Iterator<Item = &WireEnvelope> {
88 self.queue.iter()
89 }
90}
91