command_executor/
blocking_queue_adapter.rs

1use std::time::Duration;
2
3use crate::blocking_queue::BlockingQueue;
4use crate::crossbeam_blocking_queue::CrossbeamBlockingQueue;
5use crate::queue_type::QueueType;
6
7/// Wrapper over available queue types.
8pub enum BlockingQueueAdapter<E> where E: Send + Sync {
9    BlockingQueue {
10        blocking_queue: BlockingQueue<E>,
11    },
12    CrossbeamBlockingQueue {
13        crossbeam_blocking_queue: CrossbeamBlockingQueue<E>
14    },
15}
16
17impl<E> BlockingQueueAdapter<E> where E: Send + Sync {
18    pub fn new(queue_type: QueueType, size: usize) -> BlockingQueueAdapter::<E> {
19        match queue_type {
20            QueueType::BlockingQueue => {
21                BlockingQueueAdapter::BlockingQueue {
22                    blocking_queue: BlockingQueue::new(size)
23                }
24            }
25            QueueType::CrossbeamBlockingQueue => {
26                BlockingQueueAdapter::CrossbeamBlockingQueue {
27                    crossbeam_blocking_queue: CrossbeamBlockingQueue::new(size)
28                }
29            }
30        }
31    }
32
33    pub fn len(&self) -> usize {
34        match self {
35            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
36                blocking_queue.len()
37            }
38            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
39                crossbeam_blocking_queue.len()
40            }
41        }
42    }
43
44    pub fn capacity(&self) -> usize {
45        match self {
46            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
47                blocking_queue.capacity()
48            }
49            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
50                crossbeam_blocking_queue.capacity()
51            }
52        }
53    }
54
55    pub fn is_empty(&self) -> bool {
56        match self {
57            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
58                blocking_queue.is_empty()
59            }
60            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
61                crossbeam_blocking_queue.is_empty()
62            }
63        }
64    }
65
66    pub fn is_full(&self) -> bool {
67        match self {
68            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
69                blocking_queue.is_full()
70            }
71            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
72                crossbeam_blocking_queue.is_full()
73            }
74        }
75    }
76
77    pub fn wait_empty(&self, timeout: Duration) -> bool {
78        match self {
79            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
80                blocking_queue.wait_empty(timeout)
81            }
82            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
83                crossbeam_blocking_queue.wait_empty(timeout)
84            }
85        }
86    }
87
88    pub fn enqueue(&self, element: E) {
89        match self {
90            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
91                blocking_queue.enqueue(element)
92            }
93            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
94                crossbeam_blocking_queue.enqueue(element)
95            }
96        }
97    }
98
99    pub fn try_enqueue(&self, element: E, timeout: Duration) -> Option<E> {
100        match self {
101            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
102                blocking_queue.try_enqueue(element, timeout)
103            }
104            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
105                crossbeam_blocking_queue.try_enqueue(element, timeout)
106            }
107        }
108    }
109
110    pub fn dequeue(&self) -> Option<E> {
111        match self {
112            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
113                blocking_queue.dequeue()
114            }
115            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
116                crossbeam_blocking_queue.dequeue()
117            }
118        }
119    }
120
121    pub fn try_dequeue(&self, timeout: Duration) -> Option<E> {
122        match self {
123            BlockingQueueAdapter::BlockingQueue { blocking_queue } => {
124                blocking_queue.try_dequeue(timeout)
125            }
126            BlockingQueueAdapter::CrossbeamBlockingQueue { crossbeam_blocking_queue } => {
127                crossbeam_blocking_queue.try_dequeue(timeout)
128            }
129        }
130    }
131}