concurrency_traits/queue/impls/
semaphore_queue.rs

1use crate::mutex::{Mutex, SpinLock};
2use crate::queue::*;
3use crate::semaphore::*;
4use crate::ThreadFunctions;
5use alloc::boxed::Box;
6use alloc::collections::VecDeque;
7use async_trait::async_trait;
8use core::time::Duration;
9use num::Zero;
10
11/// A queue based on a semaphore to block on.
12#[derive(Debug)]
13pub struct SemaphoreQueue<T, S, CS> {
14    queue: SpinLock<VecDeque<T>, CS>,
15    semaphore: S,
16}
17impl<T, S, CS> SemaphoreQueue<T, S, CS>
18where
19    S: ReadoutSemaphore,
20{
21    /// Gets the length of the queue.
22    pub fn len(&self) -> S::Count {
23        self.semaphore.count()
24    }
25
26    /// Tells whether the queue is empty.
27    pub fn is_empty(&self) -> bool
28    where
29        S::Count: Zero,
30    {
31        self.semaphore.count().is_zero()
32    }
33}
34impl<T, S, CS> Default for SemaphoreQueue<T, S, CS>
35where
36    S: Default,
37{
38    fn default() -> Self {
39        Self {
40            queue: Default::default(),
41            semaphore: S::default(),
42        }
43    }
44}
45
46impl<T, S, CS> TryQueue for SemaphoreQueue<T, S, CS>
47where
48    S: TrySemaphore,
49    CS: ThreadFunctions,
50{
51    type Item = T;
52
53    fn try_push(&self, value: Self::Item) -> Result<(), Self::Item> {
54        self.queue.lock().push_back(value);
55        self.semaphore.signal();
56        Ok(())
57    }
58
59    fn try_pop(&self) -> Option<Self::Item> {
60        match self.semaphore.try_wait() {
61            true => Some(self.queue.lock().pop_front().unwrap()),
62            false => None,
63        }
64    }
65}
66impl<T, S, CS> Queue for SemaphoreQueue<T, S, CS>
67where
68    S: Semaphore,
69    CS: ThreadFunctions,
70{
71    fn push(&self, value: Self::Item) {
72        self.try_push(value)
73            .unwrap_or_else(|_| panic!("try_push failed!"));
74    }
75
76    fn pop(&self) -> Self::Item {
77        self.semaphore.wait();
78        self.queue.lock().pop_front().unwrap()
79    }
80}
81#[async_trait]
82impl<T, S, CS> AsyncQueue for SemaphoreQueue<T, S, CS>
83where
84    T: Send,
85    S: AsyncSemaphore + Send + Sync,
86    CS: ThreadFunctions,
87{
88    async fn push_async(&self, value: Self::Item) {
89        self.try_push(value)
90            .unwrap_or_else(|_| panic!("try_push failed!"))
91    }
92
93    async fn pop_async(&self) -> Self::Item {
94        self.semaphore.wait_async().await;
95        self.queue.lock().pop_front().unwrap()
96    }
97}
98
99impl<T, S, CS> TryPrependQueue for SemaphoreQueue<T, S, CS>
100where
101    S: TrySemaphore,
102    CS: ThreadFunctions,
103{
104    fn try_push_front(&self, value: Self::Item) -> Result<(), Self::Item> {
105        self.queue.lock().push_front(value);
106        self.semaphore.signal();
107        Ok(())
108    }
109}
110impl<T, S, CS> PrependQueue for SemaphoreQueue<T, S, CS>
111where
112    S: Semaphore,
113    CS: ThreadFunctions,
114{
115    fn push_front(&self, value: Self::Item) {
116        self.try_push_front(value)
117            .unwrap_or_else(|_| panic!("try_push_front failed!"));
118    }
119}
120#[async_trait]
121impl<T, S, CS> AsyncPrependQueue for SemaphoreQueue<T, S, CS>
122where
123    T: Send,
124    S: AsyncSemaphore + Send + Sync,
125    CS: ThreadFunctions,
126{
127    async fn push_front_async(&self, value: Self::Item) {
128        self.try_push_front(value)
129            .unwrap_or_else(|_| panic!("try_push_front failed!"));
130    }
131}
132
133impl<T, S, CS> TryReverseQueue for SemaphoreQueue<T, S, CS>
134where
135    S: TrySemaphore,
136    CS: ThreadFunctions,
137{
138    fn try_pop_back(&self) -> Option<Self::Item> {
139        match self.semaphore.try_wait() {
140            true => Some(self.queue.lock().pop_back().unwrap()),
141            false => None,
142        }
143    }
144}
145impl<T, S, CS> ReverseQueue for SemaphoreQueue<T, S, CS>
146where
147    S: Semaphore,
148    CS: ThreadFunctions,
149{
150    fn pop_back(&self) -> Self::Item {
151        self.try_pop_back().unwrap()
152    }
153}
154#[async_trait]
155impl<T, S, CS> AsyncReverseQueue for SemaphoreQueue<T, S, CS>
156where
157    T: Send,
158    S: AsyncSemaphore + Send + Sync,
159    CS: ThreadFunctions,
160{
161    async fn pop_back_async(&self) -> Self::Item {
162        self.semaphore.wait_async().await;
163        self.queue.lock().pop_back().unwrap()
164    }
165}
166
167impl<T, S, CS> TryDoubleEndedQueue for SemaphoreQueue<T, S, CS>
168where
169    S: TrySemaphore,
170    CS: ThreadFunctions,
171{
172}
173impl<T, S, CS> DoubleEndedQueue for SemaphoreQueue<T, S, CS>
174where
175    S: Semaphore,
176    CS: ThreadFunctions,
177{
178}
179impl<T, S, CS> AsyncDoubleEndedQueue for SemaphoreQueue<T, S, CS>
180where
181    T: Send,
182    S: AsyncSemaphore + Send + Sync,
183    CS: ThreadFunctions,
184{
185}
186
187impl<T, S, CS> TimeoutQueue for SemaphoreQueue<T, S, CS>
188where
189    S: TimeoutSemaphore,
190    CS: ThreadFunctions,
191{
192    fn push_timeout(&self, value: Self::Item, _: Duration) -> Result<(), Self::Item> {
193        self.try_push(value)
194    }
195
196    fn pop_timeout(&self, timeout: Duration) -> Option<Self::Item> {
197        match self.semaphore.wait_timeout(timeout) {
198            true => Some(self.queue.lock().pop_front().unwrap()),
199            false => None,
200        }
201    }
202}
203#[async_trait]
204impl<T, S, CS> AsyncTimeoutQueue for SemaphoreQueue<T, S, CS>
205where
206    T: Send,
207    S: AsyncTimeoutSemaphore + Send + Sync,
208    CS: ThreadFunctions,
209{
210    async fn push_timeout_async(&self, value: Self::Item, _: Duration) -> Result<(), Self::Item> {
211        self.try_push(value)
212    }
213
214    async fn pop_timeout_async(&self, timeout: Duration) -> Option<Self::Item> {
215        match self.semaphore.wait_timeout_async(timeout).await {
216            true => Some(self.queue.lock().pop_front().unwrap()),
217            false => None,
218        }
219    }
220}