concurrency_traits/queue/impls/
semaphore_queue.rs1use crate::mutex::{Mutex, SpinLock};
2use crate::queue::*;
3use crate::semaphore::*;
4use crate::ThreadFunctions;
5use alloc::boxed::Box;
6use alloc::collections::VecDeque;
7use async_trait::async_trait;
8use core::time::Duration;
9use num::Zero;
10
11#[derive(Debug)]
13pub struct SemaphoreQueue<T, S, CS> {
14 queue: SpinLock<VecDeque<T>, CS>,
15 semaphore: S,
16}
17impl<T, S, CS> SemaphoreQueue<T, S, CS>
18where
19 S: ReadoutSemaphore,
20{
21 pub fn len(&self) -> S::Count {
23 self.semaphore.count()
24 }
25
26 pub fn is_empty(&self) -> bool
28 where
29 S::Count: Zero,
30 {
31 self.semaphore.count().is_zero()
32 }
33}
34impl<T, S, CS> Default for SemaphoreQueue<T, S, CS>
35where
36 S: Default,
37{
38 fn default() -> Self {
39 Self {
40 queue: Default::default(),
41 semaphore: S::default(),
42 }
43 }
44}
45
46impl<T, S, CS> TryQueue for SemaphoreQueue<T, S, CS>
47where
48 S: TrySemaphore,
49 CS: ThreadFunctions,
50{
51 type Item = T;
52
53 fn try_push(&self, value: Self::Item) -> Result<(), Self::Item> {
54 self.queue.lock().push_back(value);
55 self.semaphore.signal();
56 Ok(())
57 }
58
59 fn try_pop(&self) -> Option<Self::Item> {
60 match self.semaphore.try_wait() {
61 true => Some(self.queue.lock().pop_front().unwrap()),
62 false => None,
63 }
64 }
65}
66impl<T, S, CS> Queue for SemaphoreQueue<T, S, CS>
67where
68 S: Semaphore,
69 CS: ThreadFunctions,
70{
71 fn push(&self, value: Self::Item) {
72 self.try_push(value)
73 .unwrap_or_else(|_| panic!("try_push failed!"));
74 }
75
76 fn pop(&self) -> Self::Item {
77 self.semaphore.wait();
78 self.queue.lock().pop_front().unwrap()
79 }
80}
81#[async_trait]
82impl<T, S, CS> AsyncQueue for SemaphoreQueue<T, S, CS>
83where
84 T: Send,
85 S: AsyncSemaphore + Send + Sync,
86 CS: ThreadFunctions,
87{
88 async fn push_async(&self, value: Self::Item) {
89 self.try_push(value)
90 .unwrap_or_else(|_| panic!("try_push failed!"))
91 }
92
93 async fn pop_async(&self) -> Self::Item {
94 self.semaphore.wait_async().await;
95 self.queue.lock().pop_front().unwrap()
96 }
97}
98
99impl<T, S, CS> TryPrependQueue for SemaphoreQueue<T, S, CS>
100where
101 S: TrySemaphore,
102 CS: ThreadFunctions,
103{
104 fn try_push_front(&self, value: Self::Item) -> Result<(), Self::Item> {
105 self.queue.lock().push_front(value);
106 self.semaphore.signal();
107 Ok(())
108 }
109}
110impl<T, S, CS> PrependQueue for SemaphoreQueue<T, S, CS>
111where
112 S: Semaphore,
113 CS: ThreadFunctions,
114{
115 fn push_front(&self, value: Self::Item) {
116 self.try_push_front(value)
117 .unwrap_or_else(|_| panic!("try_push_front failed!"));
118 }
119}
120#[async_trait]
121impl<T, S, CS> AsyncPrependQueue for SemaphoreQueue<T, S, CS>
122where
123 T: Send,
124 S: AsyncSemaphore + Send + Sync,
125 CS: ThreadFunctions,
126{
127 async fn push_front_async(&self, value: Self::Item) {
128 self.try_push_front(value)
129 .unwrap_or_else(|_| panic!("try_push_front failed!"));
130 }
131}
132
133impl<T, S, CS> TryReverseQueue for SemaphoreQueue<T, S, CS>
134where
135 S: TrySemaphore,
136 CS: ThreadFunctions,
137{
138 fn try_pop_back(&self) -> Option<Self::Item> {
139 match self.semaphore.try_wait() {
140 true => Some(self.queue.lock().pop_back().unwrap()),
141 false => None,
142 }
143 }
144}
145impl<T, S, CS> ReverseQueue for SemaphoreQueue<T, S, CS>
146where
147 S: Semaphore,
148 CS: ThreadFunctions,
149{
150 fn pop_back(&self) -> Self::Item {
151 self.try_pop_back().unwrap()
152 }
153}
154#[async_trait]
155impl<T, S, CS> AsyncReverseQueue for SemaphoreQueue<T, S, CS>
156where
157 T: Send,
158 S: AsyncSemaphore + Send + Sync,
159 CS: ThreadFunctions,
160{
161 async fn pop_back_async(&self) -> Self::Item {
162 self.semaphore.wait_async().await;
163 self.queue.lock().pop_back().unwrap()
164 }
165}
166
167impl<T, S, CS> TryDoubleEndedQueue for SemaphoreQueue<T, S, CS>
168where
169 S: TrySemaphore,
170 CS: ThreadFunctions,
171{
172}
173impl<T, S, CS> DoubleEndedQueue for SemaphoreQueue<T, S, CS>
174where
175 S: Semaphore,
176 CS: ThreadFunctions,
177{
178}
179impl<T, S, CS> AsyncDoubleEndedQueue for SemaphoreQueue<T, S, CS>
180where
181 T: Send,
182 S: AsyncSemaphore + Send + Sync,
183 CS: ThreadFunctions,
184{
185}
186
187impl<T, S, CS> TimeoutQueue for SemaphoreQueue<T, S, CS>
188where
189 S: TimeoutSemaphore,
190 CS: ThreadFunctions,
191{
192 fn push_timeout(&self, value: Self::Item, _: Duration) -> Result<(), Self::Item> {
193 self.try_push(value)
194 }
195
196 fn pop_timeout(&self, timeout: Duration) -> Option<Self::Item> {
197 match self.semaphore.wait_timeout(timeout) {
198 true => Some(self.queue.lock().pop_front().unwrap()),
199 false => None,
200 }
201 }
202}
203#[async_trait]
204impl<T, S, CS> AsyncTimeoutQueue for SemaphoreQueue<T, S, CS>
205where
206 T: Send,
207 S: AsyncTimeoutSemaphore + Send + Sync,
208 CS: ThreadFunctions,
209{
210 async fn push_timeout_async(&self, value: Self::Item, _: Duration) -> Result<(), Self::Item> {
211 self.try_push(value)
212 }
213
214 async fn pop_timeout_async(&self, timeout: Duration) -> Option<Self::Item> {
215 match self.semaphore.wait_timeout_async(timeout).await {
216 true => Some(self.queue.lock().pop_front().unwrap()),
217 false => None,
218 }
219 }
220}