Skip to main content

bb_runtime/framework/
serialize_queue.rs

1//! `SerializeQueue` - named-FIFO map for `Serialize.Enqueue` /
2//! `Serialize.Dequeue` syscalls.
3//!
4//! every per-key FIFO is bounded with a drop counter
5//! so a hostile or buggy producer cannot grow the queue unbounded.
6//! `enqueue` drops the OLDEST entry on overflow (FIFO eviction)
7//! and ticks the per-key drop counter. The default cap is
8//! [`SerializeQueue::DEFAULT_PER_QUEUE_CAP`]; production deployments
9//! override via `NodeConfig` (see ).
10
11use std::collections::{HashMap, VecDeque};
12
13/// Named-FIFO map with per-queue bounded capacity.
14pub struct SerializeQueue {
15    queues: HashMap<String, VecDeque<Vec<u8>>>,
16    drops: HashMap<String, u64>,
17    per_queue_cap: usize,
18}
19
20impl Default for SerializeQueue {
21    fn default() -> Self {
22        Self {
23            queues: HashMap::new(),
24            drops: HashMap::new(),
25            per_queue_cap: Self::DEFAULT_PER_QUEUE_CAP,
26        }
27    }
28}
29
30impl SerializeQueue {
31    /// Default per-named-queue capacity. 4096 entries balances
32    /// "deep enough to survive a transient publisher burst" against
33    /// "small enough to bound the worst-case memory footprint of a
34    /// single misbehaving key."
35    pub const DEFAULT_PER_QUEUE_CAP: usize = 4096;
36
37    /// Construct a fresh empty map with the default per-queue cap.
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Construct with a custom per-queue cap. Useful for edge
43    /// deployments tightening the bound below the default.
44    pub fn with_per_queue_cap(per_queue_cap: usize) -> Self {
45        Self {
46            queues: HashMap::new(),
47            drops: HashMap::new(),
48            per_queue_cap,
49        }
50    }
51
52    /// Enqueue `bytes` into the named FIFO. If the FIFO is already
53    /// at capacity, the OLDEST entry is dropped (FIFO eviction)
54    /// and the per-key drop counter ticks.
55    pub fn enqueue(&mut self, name: &str, bytes: Vec<u8>) {
56        let queue = self.queues.entry(name.to_string()).or_default();
57        if queue.len() >= self.per_queue_cap {
58            queue.pop_front();
59            *self.drops.entry(name.to_string()).or_default() += 1;
60        }
61        queue.push_back(bytes);
62    }
63
64    /// Dequeue from the head of the named FIFO. Returns `None` if
65    /// empty.
66    pub fn dequeue(&mut self, name: &str) -> Option<Vec<u8>> {
67        self.queues.get_mut(name).and_then(|q| q.pop_front())
68    }
69
70    /// Length of the named FIFO (0 if missing).
71    pub fn len(&self, name: &str) -> usize {
72        self.queues.get(name).map(|q| q.len()).unwrap_or(0)
73    }
74
75    /// Cumulative drop count for the named FIFO. Observability for
76    /// queue-overflow conditions.
77    pub fn drops(&self, name: &str) -> u64 {
78        self.drops.get(name).copied().unwrap_or(0)
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    #[test]
86    fn enqueue_dequeue_roundtrip() {
87        let mut q = SerializeQueue::new();
88        q.enqueue("foo", b"a".to_vec());
89        q.enqueue("foo", b"b".to_vec());
90        assert_eq!(q.len("foo"), 2);
91        assert_eq!(q.dequeue("foo"), Some(b"a".to_vec()));
92        assert_eq!(q.dequeue("foo"), Some(b"b".to_vec()));
93        assert_eq!(q.dequeue("foo"), None);
94    }
95
96    #[test]
97    fn enqueue_at_cap_drops_oldest_and_ticks_counter() {
98        // bounded queue: at cap, the oldest entry
99        // gets evicted and the drop counter increments.
100        let mut q = SerializeQueue::with_per_queue_cap(2);
101        q.enqueue("foo", b"a".to_vec());
102        q.enqueue("foo", b"b".to_vec());
103        assert_eq!(q.len("foo"), 2);
104        assert_eq!(q.drops("foo"), 0);
105        q.enqueue("foo", b"c".to_vec());
106        assert_eq!(q.len("foo"), 2);
107        assert_eq!(q.drops("foo"), 1);
108        // Oldest ("a") is gone; "b" + "c" remain in FIFO order.
109        assert_eq!(q.dequeue("foo"), Some(b"b".to_vec()));
110        assert_eq!(q.dequeue("foo"), Some(b"c".to_vec()));
111    }
112}