bb_runtime/framework/
serialize_queue.rs1use std::collections::{HashMap, VecDeque};
12
13pub struct SerializeQueue {
15 queues: HashMap<String, VecDeque<Vec<u8>>>,
16 drops: HashMap<String, u64>,
17 per_queue_cap: usize,
18}
19
20impl Default for SerializeQueue {
21 fn default() -> Self {
22 Self {
23 queues: HashMap::new(),
24 drops: HashMap::new(),
25 per_queue_cap: Self::DEFAULT_PER_QUEUE_CAP,
26 }
27 }
28}
29
30impl SerializeQueue {
31 pub const DEFAULT_PER_QUEUE_CAP: usize = 4096;
36
37 pub fn new() -> Self {
39 Self::default()
40 }
41
42 pub fn with_per_queue_cap(per_queue_cap: usize) -> Self {
45 Self {
46 queues: HashMap::new(),
47 drops: HashMap::new(),
48 per_queue_cap,
49 }
50 }
51
52 pub fn enqueue(&mut self, name: &str, bytes: Vec<u8>) {
56 let queue = self.queues.entry(name.to_string()).or_default();
57 if queue.len() >= self.per_queue_cap {
58 queue.pop_front();
59 *self.drops.entry(name.to_string()).or_default() += 1;
60 }
61 queue.push_back(bytes);
62 }
63
64 pub fn dequeue(&mut self, name: &str) -> Option<Vec<u8>> {
67 self.queues.get_mut(name).and_then(|q| q.pop_front())
68 }
69
70 pub fn len(&self, name: &str) -> usize {
72 self.queues.get(name).map(|q| q.len()).unwrap_or(0)
73 }
74
75 pub fn drops(&self, name: &str) -> u64 {
78 self.drops.get(name).copied().unwrap_or(0)
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 #[test]
86 fn enqueue_dequeue_roundtrip() {
87 let mut q = SerializeQueue::new();
88 q.enqueue("foo", b"a".to_vec());
89 q.enqueue("foo", b"b".to_vec());
90 assert_eq!(q.len("foo"), 2);
91 assert_eq!(q.dequeue("foo"), Some(b"a".to_vec()));
92 assert_eq!(q.dequeue("foo"), Some(b"b".to_vec()));
93 assert_eq!(q.dequeue("foo"), None);
94 }
95
96 #[test]
97 fn enqueue_at_cap_drops_oldest_and_ticks_counter() {
98 let mut q = SerializeQueue::with_per_queue_cap(2);
101 q.enqueue("foo", b"a".to_vec());
102 q.enqueue("foo", b"b".to_vec());
103 assert_eq!(q.len("foo"), 2);
104 assert_eq!(q.drops("foo"), 0);
105 q.enqueue("foo", b"c".to_vec());
106 assert_eq!(q.len("foo"), 2);
107 assert_eq!(q.drops("foo"), 1);
108 assert_eq!(q.dequeue("foo"), Some(b"b".to_vec()));
110 assert_eq!(q.dequeue("foo"), Some(b"c".to_vec()));
111 }
112}