Skip to main content

bb_runtime/framework/
outbound_queue.rs

1//! `OutboundQueue` - FIFO of wire envelopes ready to ship.
2//!
3//! Used by Phase 8 of the engine's poll cycle: drained on each
4//! cycle into `EngineStep::SendEnvelope` outputs. Carries an
5//! optional FIFO-drop cap (`NodeConfig.max_outbound_queue`); when
6//! a push would exceed the cap, the oldest envelope is evicted
7//! and a counter increments. Phase 8 reads + resets the counter to
8//! emit `EngineStep::OutboundDropped { count }`.
9
10use crate::envelope::{WireEnvelope, ENVELOPE_SCHEMA_VERSION};
11use std::collections::VecDeque;
12
13/// FIFO of wire envelopes ready to ship.
14#[derive(Default)]
15pub struct OutboundQueue {
16    queue: VecDeque<WireEnvelope>,
17    cap: Option<usize>,
18    dropped_since_last_drain: usize,
19}
20
21impl OutboundQueue {
22    /// Construct a fresh outbound queue with no cap.
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    /// Construct a fresh outbound queue with the given FIFO-drop
28    /// cap. `None` disables the cap.
29    pub fn with_cap(cap: Option<usize>) -> Self {
30        Self {
31            queue: VecDeque::new(),
32            cap,
33            dropped_since_last_drain: 0,
34        }
35    }
36
37    /// Set the FIFO-drop cap. `None` removes the cap.
38    pub fn set_cap(&mut self, cap: Option<usize>) {
39        self.cap = cap;
40    }
41
42    /// Push an envelope for shipment. If a cap is set and the
43    /// queue is at the cap, FIFO-evict the oldest entry and bump
44    /// the drop counter.
45    ///
46    /// Stamps `schema_version` here so the encode boundary can read
47    /// `&WireEnvelope` and call `encode_to_vec` without cloning the
48    /// payload to set the field defensively.
49    pub fn push(&mut self, mut env: WireEnvelope) {
50        if env.schema_version == 0 {
51            env.schema_version = ENVELOPE_SCHEMA_VERSION;
52        }
53        if let Some(cap) = self.cap {
54            while self.queue.len() >= cap {
55                self.queue.pop_front();
56                self.dropped_since_last_drain += 1;
57            }
58        }
59        self.queue.push_back(env);
60    }
61
62    /// Drain all queued envelopes. Called by Phase 8 of the poll
63    /// cycle.
64    pub fn drain_all(&mut self) -> Vec<WireEnvelope> {
65        self.queue.drain(..).collect()
66    }
67
68    /// Read + reset the count of FIFO-dropped envelopes since the
69    /// last call. Returns 0 when no drops occurred.
70    pub fn take_dropped_count(&mut self) -> usize {
71        std::mem::take(&mut self.dropped_since_last_drain)
72    }
73
74    /// Number of queued envelopes.
75    pub fn len(&self) -> usize {
76        self.queue.len()
77    }
78
79    /// Whether the queue is empty.
80    pub fn is_empty(&self) -> bool {
81        self.queue.is_empty()
82    }
83
84    /// Iterate queued envelopes for snapshot capture. The queue
85    /// is not consumed - Phase 8 still drains on the next poll.
86    /// .
87    pub fn iter_for_snapshot(&self) -> impl Iterator<Item = &WireEnvelope> {
88        self.queue.iter()
89    }
90}
91