noxu_rep/stream/
output_thread.rs1use noxu_sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10pub struct OutputQueue {
16 queue: Mutex<Vec<Vec<u8>>>,
18 shutdown: AtomicBool,
20 max_queue_size: usize,
22}
23
24impl OutputQueue {
25 pub fn new(max_queue_size: usize) -> Self {
27 OutputQueue {
28 queue: Mutex::new(Vec::new()),
29 shutdown: AtomicBool::new(false),
30 max_queue_size,
31 }
32 }
33
34 pub fn enqueue(&self, message: Vec<u8>) -> bool {
39 if self.shutdown.load(Ordering::Acquire) {
40 return false;
41 }
42 let mut queue = self.queue.lock();
43 if queue.len() >= self.max_queue_size {
44 return false;
45 }
46 queue.push(message);
47 true
48 }
49
50 pub fn dequeue_batch(&self, max: usize) -> Vec<Vec<u8>> {
55 let mut queue = self.queue.lock();
56 let count = max.min(queue.len());
57 queue.drain(..count).collect()
58 }
59
60 pub fn len(&self) -> usize {
62 self.queue.lock().len()
63 }
64
65 pub fn is_empty(&self) -> bool {
67 self.queue.lock().is_empty()
68 }
69
70 pub fn shutdown(&self) {
72 self.shutdown.store(true, Ordering::Release);
73 }
74
75 pub fn is_shutdown(&self) -> bool {
77 self.shutdown.load(Ordering::Acquire)
78 }
79
80 pub fn clear(&self) {
82 self.queue.lock().clear();
83 }
84}
85
86impl std::fmt::Debug for OutputQueue {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("OutputQueue")
89 .field("len", &self.len())
90 .field("max_queue_size", &self.max_queue_size)
91 .field("shutdown", &self.is_shutdown())
92 .finish()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn test_new_queue() {
102 let q = OutputQueue::new(10);
103 assert!(q.is_empty());
104 assert_eq!(q.len(), 0);
105 assert!(!q.is_shutdown());
106 }
107
108 #[test]
109 fn test_enqueue_dequeue() {
110 let q = OutputQueue::new(10);
111 assert!(q.enqueue(vec![1, 2, 3]));
112 assert!(q.enqueue(vec![4, 5]));
113 assert_eq!(q.len(), 2);
114 assert!(!q.is_empty());
115
116 let batch = q.dequeue_batch(10);
117 assert_eq!(batch.len(), 2);
118 assert_eq!(batch[0], vec![1, 2, 3]);
119 assert_eq!(batch[1], vec![4, 5]);
120 assert!(q.is_empty());
121 }
122
123 #[test]
124 fn test_batch_dequeue_partial() {
125 let q = OutputQueue::new(100);
126 for i in 0..10 {
127 q.enqueue(vec![i]);
128 }
129 assert_eq!(q.len(), 10);
130
131 let batch = q.dequeue_batch(3);
132 assert_eq!(batch.len(), 3);
133 assert_eq!(batch[0], vec![0]);
134 assert_eq!(batch[1], vec![1]);
135 assert_eq!(batch[2], vec![2]);
136 assert_eq!(q.len(), 7);
137
138 let batch2 = q.dequeue_batch(5);
139 assert_eq!(batch2.len(), 5);
140 assert_eq!(q.len(), 2);
141 }
142
143 #[test]
144 fn test_capacity_limit() {
145 let q = OutputQueue::new(3);
146 assert!(q.enqueue(vec![1]));
147 assert!(q.enqueue(vec![2]));
148 assert!(q.enqueue(vec![3]));
149 assert!(!q.enqueue(vec![4]));
151 assert_eq!(q.len(), 3);
152
153 q.dequeue_batch(1);
155 assert!(q.enqueue(vec![4]));
156 assert_eq!(q.len(), 3);
157 }
158
159 #[test]
160 fn test_shutdown_rejects_enqueue() {
161 let q = OutputQueue::new(10);
162 assert!(q.enqueue(vec![1]));
163 q.shutdown();
164 assert!(q.is_shutdown());
165 assert!(!q.enqueue(vec![2]));
166 let batch = q.dequeue_batch(10);
168 assert_eq!(batch.len(), 1);
169 }
170
171 #[test]
172 fn test_clear() {
173 let q = OutputQueue::new(10);
174 q.enqueue(vec![1]);
175 q.enqueue(vec![2]);
176 q.enqueue(vec![3]);
177 assert_eq!(q.len(), 3);
178
179 q.clear();
180 assert!(q.is_empty());
181 assert_eq!(q.len(), 0);
182 }
183
184 #[test]
185 fn test_dequeue_empty() {
186 let q = OutputQueue::new(10);
187 let batch = q.dequeue_batch(5);
188 assert!(batch.is_empty());
189 }
190
191 #[test]
192 fn test_dequeue_batch_zero() {
193 let q = OutputQueue::new(10);
194 q.enqueue(vec![1]);
195 let batch = q.dequeue_batch(0);
196 assert!(batch.is_empty());
197 assert_eq!(q.len(), 1);
198 }
199
200 #[test]
201 fn test_shutdown_then_clear() {
202 let q = OutputQueue::new(10);
203 q.enqueue(vec![1]);
204 q.enqueue(vec![2]);
205 q.shutdown();
206 q.clear();
207 assert!(q.is_empty());
208 assert!(q.is_shutdown());
209 }
210
211 #[test]
212 fn test_concurrent_enqueue() {
213 use std::sync::Arc;
214 use std::thread;
215
216 let q = Arc::new(OutputQueue::new(1000));
217 let mut handles = vec![];
218
219 for t in 0..4 {
220 let queue = Arc::clone(&q);
221 handles.push(thread::spawn(move || {
222 let mut count = 0;
223 for i in 0..100 {
224 if queue.enqueue(vec![t, i]) {
225 count += 1;
226 }
227 }
228 count
229 }));
230 }
231
232 let total: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
233 assert_eq!(q.len(), total);
234 assert!(total <= 400);
235 }
236
237 #[test]
238 fn test_debug_format() {
239 let q = OutputQueue::new(42);
240 q.enqueue(vec![1]);
241 let debug = format!("{:?}", q);
242 assert!(debug.contains("OutputQueue"));
243 assert!(debug.contains("max_queue_size: 42"));
244 }
245}