1#![allow(dead_code)]
2
3use std::{
4 collections::VecDeque,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10 task::{Context, Poll, Waker},
11};
12
13use futures::stream::Stream;
14use parking_lot::Mutex;
15
16#[derive(Debug)]
20struct ReceiverNotifier {
21 handle: Waker,
22 awake: Arc<AtomicBool>,
23}
24
25struct RawDeque<T> {
42 front_values: VecDeque<T>,
43 back_values: VecDeque<T>,
44 rx_notifiers: VecDeque<ReceiverNotifier>,
45}
46
47impl<T> RawDeque<T> {
48 const fn new() -> Self {
49 Self {
50 front_values: VecDeque::new(),
51 back_values: VecDeque::new(),
52 rx_notifiers: VecDeque::new(),
53 }
54 }
55}
56
57impl<T> RawDeque<T> {
58 fn notify_rx(&mut self) {
60 if let Some(n) = self.rx_notifiers.pop_front() {
61 n.handle.wake();
62 n.awake.store(true, Ordering::Relaxed);
63 }
64 }
65}
66
67pub struct StreamableDeque<T> {
70 inner: Mutex<RawDeque<T>>,
71}
72
73impl<T> std::fmt::Debug for StreamableDeque<T> {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("StreamableDeque { ... }").finish()
76 }
77}
78
79impl<T> Default for StreamableDeque<T> {
80 fn default() -> Self {
81 Self {
82 inner: Mutex::new(RawDeque::new()),
83 }
84 }
85}
86
87impl<T> StreamableDeque<T> {
88 #[must_use]
89 pub fn new() -> Self {
90 Self::default()
91 }
92
93 pub fn push_front(&self, item: T) {
95 let mut inner = self.inner.lock();
96 inner.front_values.push_back(item);
97 inner.notify_rx();
99 }
100
101 pub fn push_back(&self, item: T) {
103 let mut inner = self.inner.lock();
104 inner.back_values.push_back(item);
105 inner.notify_rx();
107 }
108
109 pub const fn stream(&self) -> StreamReceiver<T> {
112 StreamReceiver {
113 queue: self,
114 awake: None,
115 }
116 }
117
118 pub fn pop_front(&self) -> Option<T> {
119 let mut inner = self.inner.lock();
120 inner
121 .front_values
122 .pop_front()
123 .or_else(|| inner.back_values.pop_front())
124 }
125
126 #[cfg(test)]
127 pub(crate) fn pop_back(&self) -> Option<T> {
128 let mut inner = self.inner.lock();
129 inner
130 .back_values
131 .pop_back()
132 .or_else(|| inner.front_values.pop_back())
133 }
134}
135
136pub struct StreamReceiver<'a, T> {
138 queue: &'a StreamableDeque<T>,
139 awake: Option<Arc<AtomicBool>>,
140}
141
142impl<T> Stream for StreamReceiver<'_, T> {
143 type Item = T;
144
145 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
146 let mut inner = self.queue.inner.lock();
147
148 let value = inner
149 .front_values
150 .pop_front()
151 .or_else(|| inner.back_values.pop_front());
152
153 if let Some(v) = value {
154 self.awake = None;
155 Poll::Ready(Some(v))
156 } else {
157 let awake = Arc::new(AtomicBool::new(false));
159 inner.rx_notifiers.push_back(ReceiverNotifier {
161 handle: ctx.waker().clone(),
162 awake: awake.clone(),
163 });
164 self.awake = Some(awake);
165 drop(inner);
166 Poll::Pending
167 }
168 }
169}
170
171impl<T> Drop for StreamReceiver<'_, T> {
172 fn drop(&mut self) {
174 let awake = self.awake.take().map(|w| w.load(Ordering::Relaxed));
175
176 if awake == Some(true) {
177 let mut queue_wakers = self.queue.inner.lock();
178 if let Some(n) = queue_wakers.rx_notifiers.pop_front() {
180 n.awake.store(true, Ordering::Relaxed);
181 n.handle.wake();
182 }
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use futures::stream::StreamExt;
190
191 use super::*;
192
193 #[tokio::test]
194 async fn streamable_deque() {
195 let queue = Arc::new(StreamableDeque::<i32>::new());
196
197 let pos_queue = queue.clone();
198 tokio::spawn(async move {
199 for i in 0..=10 {
200 pos_queue.push_back(i);
201 }
202 });
203
204 let neg_queue = queue.clone();
205 tokio::spawn(async move {
206 for i in -10..=-1 {
207 neg_queue.push_front(i);
208 }
209 });
210
211 let mut rx_vec = vec![];
212
213 let mut stream = queue.stream().enumerate();
214 while let Some((i, v)) = stream.next().await {
215 rx_vec.push(v);
216 if i >= 20 {
217 break;
218 }
219 }
220
221 let expected_vec: Vec<i32> = (-10..=10).collect();
224 assert_eq!(expected_vec, rx_vec);
225 }
226}