command_executor/
crossbeam_blocking_queue.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crossbeam::queue::ArrayQueue;
5
6pub struct CrossbeamBlockingQueue<E> where E: Send + Sync {
7    elements: Arc<ArrayQueue<E>>,
8}
9
10impl<E> CrossbeamBlockingQueue<E> where E: Send + Sync {
11    pub fn new(size: usize) -> CrossbeamBlockingQueue<E> {
12        CrossbeamBlockingQueue::<E> {
13            elements: Arc::new(ArrayQueue::new(size)),
14        }
15    }
16
17    pub fn len(&self) -> usize {
18        self.elements.len()
19    }
20
21    pub fn capacity(&self) -> usize {
22        self.elements.capacity()
23    }
24
25    pub fn is_empty(&self) -> bool {
26        self.elements.is_empty()
27    }
28
29    pub fn is_full(&self) -> bool {
30        self.elements.is_full()
31    }
32
33    pub fn wait_empty(&self, timeout: Duration) -> bool {
34        let backoff = crossbeam::utils::Backoff::new();
35        let mut t = timeout;
36        let mut start = Instant::now();
37        while !self.elements.is_empty() {
38            let elapsed = start.elapsed();
39            if elapsed < t {
40                t -= elapsed;
41                start = Instant::now();
42            } else {
43                break;
44            }
45            backoff.spin();
46        }
47        self.elements.is_empty()
48    }
49
50    pub fn enqueue(&self, element: E) {
51        self.try_enqueue(element, Duration::MAX);
52    }
53
54    pub fn try_enqueue(&self, element: E, timeout: Duration) -> Option<E> {
55        let backoff = crossbeam::utils::Backoff::new();
56        let mut t = timeout;
57        let mut start = Instant::now();
58        let mut e = element;
59        loop {
60            let result = self.elements.push(e);
61            match result {
62                Ok(_) => {
63                    break None;
64                }
65                Err(element) => {
66                    e = element;
67                    let elapsed = start.elapsed();
68                    if elapsed < t {
69                        t -= elapsed;
70                        start = Instant::now();
71                    } else {
72                        break Some(e);
73                    }
74                    backoff.spin();
75                }
76            }
77        }
78    }
79
80    pub fn dequeue(&self) -> Option<E> {
81        self.try_dequeue(Duration::MAX)
82    }
83
84    pub fn try_dequeue(&self, timeout: Duration) -> Option<E> {
85        let backoff = crossbeam::utils::Backoff::new();
86        let mut t = timeout;
87        let mut start = Instant::now();
88        loop {
89            let element = self.elements.pop();
90            if element.is_none() {
91                let elapsed = start.elapsed();
92                if elapsed < t {
93                    t -= elapsed;
94                    start = Instant::now();
95                } else {
96                    break None;
97                }
98            } else {
99                break element;
100            }
101            backoff.spin();
102        }
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::thread::Builder;
109
110    use super::*;
111
112    #[test]
113    fn test_try_dequeue() {
114        let q = CrossbeamBlockingQueue::<i32>::new(128);
115
116        let r = q.try_dequeue(Duration::from_millis(0));
117        assert_eq!(r, None);
118        let r = q.try_dequeue(Duration::from_millis(10));
119        assert_eq!(r, None);
120    }
121
122    #[test]
123    fn test_try_enqueue() {
124        let q = CrossbeamBlockingQueue::<i32>::new(128);
125        for i in 0..128 {
126            q.enqueue(i);
127        }
128
129        let r = q.try_enqueue(128, Duration::from_millis(0));
130        assert_eq!(r, Some(128));
131        let r = q.try_enqueue(128, Duration::from_millis(10));
132        assert_eq!(r, Some(128));
133    }
134
135    #[test]
136    fn test_fifo() {
137        let q = CrossbeamBlockingQueue::<i32>::new(128);
138        for i in 0..128 {
139            q.enqueue(i);
140        }
141
142        for i in 0..128 {
143            assert_eq!(q.dequeue().unwrap(), i);
144        }
145    }
146
147    #[test]
148    fn test_mpsc() {
149        let q = Arc::new(CrossbeamBlockingQueue::<(i32, i32)>::new(16));
150        let qp1 = q.clone();
151        let qp2 = q.clone();
152        let qc1 = q.clone();
153
154        let p1 = Builder::new()
155            .spawn(
156                move || {
157                    for i in 0..2048 {
158                        qp1.enqueue((1, i));
159                    }
160                }
161            );
162
163        let p2 = Builder::new()
164            .spawn(
165                move || {
166                    for i in 0..2048 {
167                        qp2.enqueue((2, i));
168                    }
169                }
170            );
171
172        let c1 = Builder::new()
173            .spawn(
174                move || {
175                    let mut collector = Vec::<(i32, i32)>::new();
176                    loop {
177                        let element = qc1.dequeue();
178                        collector.push(element.unwrap());
179                        if collector.len() == 4096 {
180                            break collector;
181                        }
182                    }
183                }
184            );
185        p1.unwrap().join().expect("failed to join producer");
186        p2.unwrap().join().expect("failed to join producer");
187
188        let mut collector = c1.unwrap().join().expect("failed to join consumer");
189        for i in 0..2048 {
190            let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
191            collector.remove(i1);
192            let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
193            collector.remove(i2);
194        }
195        assert!(collector.is_empty());
196    }
197
198    #[test]
199    fn test_mpmc() {
200        let q = Arc::new(CrossbeamBlockingQueue::<(i32, i32)>::new(16));
201        let qp1 = q.clone();
202        let qp2 = q.clone();
203        let qc1 = q.clone();
204        let qc2 = q.clone();
205
206        let p1 = Builder::new()
207            .spawn(
208                move || {
209                    for i in 0..2048 {
210                        qp1.enqueue((1, i));
211                    }
212                }
213            );
214
215        let p2 = Builder::new()
216            .spawn(
217                move || {
218                    for i in 0..2048 {
219                        qp2.enqueue((2, i));
220                    }
221                }
222            );
223
224        let c1 = Builder::new()
225            .spawn(
226                move || {
227                    let mut collector = Vec::<(i32, i32)>::new();
228                    loop {
229                        let element = qc1.dequeue();
230                        match element {
231                            None => {}
232                            Some((-1, -1)) => {
233                                break collector;
234                            }
235                            Some(e) => {
236                                collector.push(e);
237                            }
238                        }
239                    }
240                }
241            );
242
243        let c2 = Builder::new()
244            .spawn(
245                move || {
246                    let mut collector = Vec::<(i32, i32)>::new();
247                    loop {
248                        let element = qc2.dequeue();
249                        match element {
250                            None => {}
251                            Some((-1, -1)) => {
252                                break collector;
253                            }
254                            Some(e) => {
255                                collector.push(e);
256                            }
257                        }
258                    }
259                }
260            );
261        p1.unwrap().join().expect("failed to join producer");
262        p2.unwrap().join().expect("failed to join producer");
263
264        q.enqueue((-1, -1));
265        q.enqueue((-1, -1));
266
267        let mut collector1 = c1.unwrap().join().expect("failed to join consumer");
268        let mut collector2 = c2.unwrap().join().expect("failed to join consumer");
269
270        let mut collector = Vec::<(i32, i32)>::new();
271        collector.append(&mut collector1);
272        collector.append(&mut collector2);
273
274        for i in 0..2048 {
275            let i1 = collector.iter().position(|e| *e == (1, i)).unwrap();
276            collector.remove(i1);
277            let i2 = collector.iter().position(|e| *e == (2, i)).unwrap();
278            collector.remove(i2);
279        }
280        assert!(collector.is_empty());
281    }
282}