Skip to main content

noxu_rep/stream/
output_thread.rs

1//! Replication output thread abstraction.
2//!
3//! Provides a bounded, thread-safe queue for outbound replication messages.
4//! The `OutputQueue` decouples message production (by the feeder or
5//! replication logic) from message consumption (by the network I/O thread).
6
7use noxu_sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10/// Manages a queue of outbound replication messages.
11///
12/// The queue has a configurable maximum size. When the queue is full or
13/// shut down, `enqueue` returns `false`. Messages can be dequeued in
14/// batches for efficient network I/O.
15pub struct OutputQueue {
16    /// The message queue.
17    queue: Mutex<Vec<Vec<u8>>>,
18    /// Whether the queue has been shut down.
19    shutdown: AtomicBool,
20    /// Maximum number of messages the queue can hold.
21    max_queue_size: usize,
22}
23
24impl OutputQueue {
25    /// Create a new output queue with the given maximum capacity.
26    pub fn new(max_queue_size: usize) -> Self {
27        OutputQueue {
28            queue: Mutex::new(Vec::new()),
29            shutdown: AtomicBool::new(false),
30            max_queue_size,
31        }
32    }
33
34    /// Enqueue a message.
35    ///
36    /// Returns `true` if the message was accepted, `false` if the queue
37    /// is full or has been shut down.
38    pub fn enqueue(&self, message: Vec<u8>) -> bool {
39        if self.shutdown.load(Ordering::Acquire) {
40            return false;
41        }
42        let mut queue = self.queue.lock();
43        if queue.len() >= self.max_queue_size {
44            return false;
45        }
46        queue.push(message);
47        true
48    }
49
50    /// Dequeue up to `max` messages in a batch.
51    ///
52    /// Returns the messages in FIFO order. If fewer than `max` messages
53    /// are available, returns all of them.
54    pub fn dequeue_batch(&self, max: usize) -> Vec<Vec<u8>> {
55        let mut queue = self.queue.lock();
56        let count = max.min(queue.len());
57        queue.drain(..count).collect()
58    }
59
60    /// Return the number of messages currently in the queue.
61    pub fn len(&self) -> usize {
62        self.queue.lock().len()
63    }
64
65    /// Return true if the queue is empty.
66    pub fn is_empty(&self) -> bool {
67        self.queue.lock().is_empty()
68    }
69
70    /// Shut down the queue. No more messages will be accepted.
71    pub fn shutdown(&self) {
72        self.shutdown.store(true, Ordering::Release);
73    }
74
75    /// Return true if the queue has been shut down.
76    pub fn is_shutdown(&self) -> bool {
77        self.shutdown.load(Ordering::Acquire)
78    }
79
80    /// Clear all messages from the queue.
81    pub fn clear(&self) {
82        self.queue.lock().clear();
83    }
84}
85
86impl std::fmt::Debug for OutputQueue {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("OutputQueue")
89            .field("len", &self.len())
90            .field("max_queue_size", &self.max_queue_size)
91            .field("shutdown", &self.is_shutdown())
92            .finish()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn test_new_queue() {
102        let q = OutputQueue::new(10);
103        assert!(q.is_empty());
104        assert_eq!(q.len(), 0);
105        assert!(!q.is_shutdown());
106    }
107
108    #[test]
109    fn test_enqueue_dequeue() {
110        let q = OutputQueue::new(10);
111        assert!(q.enqueue(vec![1, 2, 3]));
112        assert!(q.enqueue(vec![4, 5]));
113        assert_eq!(q.len(), 2);
114        assert!(!q.is_empty());
115
116        let batch = q.dequeue_batch(10);
117        assert_eq!(batch.len(), 2);
118        assert_eq!(batch[0], vec![1, 2, 3]);
119        assert_eq!(batch[1], vec![4, 5]);
120        assert!(q.is_empty());
121    }
122
123    #[test]
124    fn test_batch_dequeue_partial() {
125        let q = OutputQueue::new(100);
126        for i in 0..10 {
127            q.enqueue(vec![i]);
128        }
129        assert_eq!(q.len(), 10);
130
131        let batch = q.dequeue_batch(3);
132        assert_eq!(batch.len(), 3);
133        assert_eq!(batch[0], vec![0]);
134        assert_eq!(batch[1], vec![1]);
135        assert_eq!(batch[2], vec![2]);
136        assert_eq!(q.len(), 7);
137
138        let batch2 = q.dequeue_batch(5);
139        assert_eq!(batch2.len(), 5);
140        assert_eq!(q.len(), 2);
141    }
142
143    #[test]
144    fn test_capacity_limit() {
145        let q = OutputQueue::new(3);
146        assert!(q.enqueue(vec![1]));
147        assert!(q.enqueue(vec![2]));
148        assert!(q.enqueue(vec![3]));
149        // Queue is full.
150        assert!(!q.enqueue(vec![4]));
151        assert_eq!(q.len(), 3);
152
153        // After draining one, we can enqueue again.
154        q.dequeue_batch(1);
155        assert!(q.enqueue(vec![4]));
156        assert_eq!(q.len(), 3);
157    }
158
159    #[test]
160    fn test_shutdown_rejects_enqueue() {
161        let q = OutputQueue::new(10);
162        assert!(q.enqueue(vec![1]));
163        q.shutdown();
164        assert!(q.is_shutdown());
165        assert!(!q.enqueue(vec![2]));
166        // Can still drain existing messages.
167        let batch = q.dequeue_batch(10);
168        assert_eq!(batch.len(), 1);
169    }
170
171    #[test]
172    fn test_clear() {
173        let q = OutputQueue::new(10);
174        q.enqueue(vec![1]);
175        q.enqueue(vec![2]);
176        q.enqueue(vec![3]);
177        assert_eq!(q.len(), 3);
178
179        q.clear();
180        assert!(q.is_empty());
181        assert_eq!(q.len(), 0);
182    }
183
184    #[test]
185    fn test_dequeue_empty() {
186        let q = OutputQueue::new(10);
187        let batch = q.dequeue_batch(5);
188        assert!(batch.is_empty());
189    }
190
191    #[test]
192    fn test_dequeue_batch_zero() {
193        let q = OutputQueue::new(10);
194        q.enqueue(vec![1]);
195        let batch = q.dequeue_batch(0);
196        assert!(batch.is_empty());
197        assert_eq!(q.len(), 1);
198    }
199
200    #[test]
201    fn test_shutdown_then_clear() {
202        let q = OutputQueue::new(10);
203        q.enqueue(vec![1]);
204        q.enqueue(vec![2]);
205        q.shutdown();
206        q.clear();
207        assert!(q.is_empty());
208        assert!(q.is_shutdown());
209    }
210
211    #[test]
212    fn test_concurrent_enqueue() {
213        use std::sync::Arc;
214        use std::thread;
215
216        let q = Arc::new(OutputQueue::new(1000));
217        let mut handles = vec![];
218
219        for t in 0..4 {
220            let queue = Arc::clone(&q);
221            handles.push(thread::spawn(move || {
222                let mut count = 0;
223                for i in 0..100 {
224                    if queue.enqueue(vec![t, i]) {
225                        count += 1;
226                    }
227                }
228                count
229            }));
230        }
231
232        let total: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
233        assert_eq!(q.len(), total);
234        assert!(total <= 400);
235    }
236
237    #[test]
238    fn test_debug_format() {
239        let q = OutputQueue::new(42);
240        q.enqueue(vec![1]);
241        let debug = format!("{:?}", q);
242        assert!(debug.contains("OutputQueue"));
243        assert!(debug.contains("max_queue_size: 42"));
244    }
245}