s2n_quic_core/sync/spsc/
recv.rs1use super::{state::Side, Cursor, Result, State};
5use core::{
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11#[derive(Debug)]
12pub struct Receiver<T>(pub(super) State<T>);
13
14impl<T> Receiver<T> {
15 #[inline]
16 pub fn capacity(&self) -> usize {
17 self.0.cursor.capacity()
18 }
19
20 #[inline]
21 pub fn len(&self) -> usize {
22 self.0.cursor.recv_len()
23 }
24
25 #[inline]
26 pub fn is_empty(&self) -> bool {
27 self.0.cursor.is_empty()
28 }
29
30 #[inline]
31 pub fn is_full(&self) -> bool {
32 self.0.cursor.is_full()
33 }
34
35 #[inline]
39 pub fn slice(&mut self) -> RecvSlice<'_, T> {
40 let cursor = self.0.cursor;
41 RecvSlice(&mut self.0, cursor)
42 }
43
44 #[inline]
46 pub async fn acquire(&mut self) -> Result<()> {
47 Acquire { receiver: self }.await
48 }
49
50 #[inline]
51 pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<RecvSlice<'_, T>>> {
52 macro_rules! acquire_filled {
53 () => {
54 match self.0.acquire_filled() {
55 Ok(true) => {
56 let cursor = self.0.cursor;
57 return Ok(RecvSlice(&mut self.0, cursor)).into();
58 }
59 Ok(false) => {
60 }
62 Err(err) => {
63 return Err(err).into();
65 }
66 }
67 };
68 }
69
70 acquire_filled!();
72
73 self.0.receiver.register(cx.waker());
75
76 acquire_filled!();
78
79 Poll::Pending
80 }
81
82 #[inline]
83 pub fn try_slice(&mut self) -> Result<Option<RecvSlice<'_, T>>> {
84 Ok(if self.0.acquire_filled()? {
85 let cursor = self.0.cursor;
86 Some(RecvSlice(&mut self.0, cursor))
87 } else {
88 None
89 })
90 }
91}
92
93impl<T> Drop for Receiver<T> {
94 #[inline]
95 fn drop(&mut self) {
96 self.0.close(Side::Receiver);
97 }
98}
99
100pub struct RecvSlice<'a, T>(&'a mut State<T>, Cursor);
101
102impl<T> RecvSlice<'_, T> {
103 #[inline]
104 pub fn peek(&mut self) -> (&mut [T], &mut [T]) {
105 let _ = self.0.acquire_filled();
106 let (slice, _) = self.0.as_pairs();
107 unsafe {
108 slice.assume_init().into_mut()
110 }
111 }
112
113 #[inline]
114 pub fn pop(&mut self) -> Option<T> {
115 if self.0.cursor.is_empty() && !self.0.acquire_filled().unwrap_or(false) {
116 return None;
117 }
118
119 let (pair, _) = self.0.as_pairs();
120 let value = unsafe {
121 pair.take(0)
123 };
124 self.0.cursor.increment_head(1);
125 Some(value)
126 }
127
128 #[inline]
129 pub fn clear(&mut self) -> usize {
130 let (pair, _) = self.0.as_pairs();
133 let len = pair.len();
134
135 for entry in pair.iter() {
136 unsafe {
137 let _ = entry.take();
139 }
140 }
141
142 self.0.cursor.increment_head(len);
143
144 len
145 }
146
147 #[inline]
149 pub fn release(&mut self, len: usize) {
150 let (pair, _) = self.0.as_pairs();
151
152 debug_assert!(pair.len() >= len, "cannot release more than was acquired");
153
154 for entry in pair.iter().take(len) {
155 unsafe {
156 let _ = entry.take();
158 }
159 }
160
161 self.0.cursor.increment_head(len);
162 }
163
164 #[inline]
165 pub fn len(&self) -> usize {
166 self.0.cursor.recv_len()
167 }
168
169 #[inline]
170 pub fn is_empty(&self) -> bool {
171 self.0.cursor.is_empty()
172 }
173
174 #[inline]
178 pub fn sync(&mut self) -> Result<(), super::ClosedError> {
179 self.0.acquire_filled()?;
180 Ok(())
181 }
182}
183
184impl<T> Drop for RecvSlice<'_, T> {
185 #[inline]
186 fn drop(&mut self) {
187 self.0.persist_head(self.1);
188 }
189}
190
191struct Acquire<'a, T> {
192 receiver: &'a mut Receiver<T>,
193}
194
195impl<T> Future for Acquire<'_, T> {
196 type Output = Result<()>;
197
198 #[inline]
199 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
200 match self.receiver.poll_slice(cx) {
201 Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
202 Poll::Pending => Poll::Pending,
203 }
204 }
205}