s2n_quic_core/sync/spsc/
recv.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{state::Side, Cursor, Result, State};
5use core::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11#[derive(Debug)]
12pub struct Receiver<T>(pub(super) State<T>);
13
14impl<T> Receiver<T> {
15    #[inline]
16    pub fn capacity(&self) -> usize {
17        self.0.cursor.capacity()
18    }
19
20    #[inline]
21    pub fn len(&self) -> usize {
22        self.0.cursor.recv_len()
23    }
24
25    #[inline]
26    pub fn is_empty(&self) -> bool {
27        self.0.cursor.is_empty()
28    }
29
30    #[inline]
31    pub fn is_full(&self) -> bool {
32        self.0.cursor.is_full()
33    }
34
35    /// Returns the currently acquired slice of entries for the receiver
36    ///
37    /// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method.
38    #[inline]
39    pub fn slice(&mut self) -> RecvSlice<'_, T> {
40        let cursor = self.0.cursor;
41        RecvSlice(&mut self.0, cursor)
42    }
43
44    /// Blocks until at least one entry is available for consumption
45    #[inline]
46    pub async fn acquire(&mut self) -> Result<()> {
47        Acquire { receiver: self }.await
48    }
49
50    #[inline]
51    pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<RecvSlice<'_, T>>> {
52        macro_rules! acquire_filled {
53            () => {
54                match self.0.acquire_filled() {
55                    Ok(true) => {
56                        let cursor = self.0.cursor;
57                        return Ok(RecvSlice(&mut self.0, cursor)).into();
58                    }
59                    Ok(false) => {
60                        // the queue is full
61                    }
62                    Err(err) => {
63                        // the channel was closed
64                        return Err(err).into();
65                    }
66                }
67            };
68        }
69
70        // check capacity before registering a waker
71        acquire_filled!();
72
73        // register the waker
74        self.0.receiver.register(cx.waker());
75
76        // check once more to avoid a loss of notification
77        acquire_filled!();
78
79        Poll::Pending
80    }
81
82    #[inline]
83    pub fn try_slice(&mut self) -> Result<Option<RecvSlice<'_, T>>> {
84        Ok(if self.0.acquire_filled()? {
85            let cursor = self.0.cursor;
86            Some(RecvSlice(&mut self.0, cursor))
87        } else {
88            None
89        })
90    }
91}
92
93impl<T> Drop for Receiver<T> {
94    #[inline]
95    fn drop(&mut self) {
96        self.0.close(Side::Receiver);
97    }
98}
99
100pub struct RecvSlice<'a, T>(&'a mut State<T>, Cursor);
101
102impl<T> RecvSlice<'_, T> {
103    #[inline]
104    pub fn peek(&mut self) -> (&mut [T], &mut [T]) {
105        let _ = self.0.acquire_filled();
106        let (slice, _) = self.0.as_pairs();
107        unsafe {
108            // Safety: the first pair of returned slices is the `initialized` half
109            slice.assume_init().into_mut()
110        }
111    }
112
113    #[inline]
114    pub fn pop(&mut self) -> Option<T> {
115        if self.0.cursor.is_empty() && !self.0.acquire_filled().unwrap_or(false) {
116            return None;
117        }
118
119        let (pair, _) = self.0.as_pairs();
120        let value = unsafe {
121            // Safety: the state's cursor indicates that the first slot contains initialized data
122            pair.take(0)
123        };
124        self.0.cursor.increment_head(1);
125        Some(value)
126    }
127
128    #[inline]
129    pub fn clear(&mut self) -> usize {
130        // don't try to `acquire_filled` so the caller can observe any updates through peek/pop
131
132        let (pair, _) = self.0.as_pairs();
133        let len = pair.len();
134
135        for entry in pair.iter() {
136            unsafe {
137                // Safety: the state's cursor indicates that each slot in the `iter` contains data
138                let _ = entry.take();
139            }
140        }
141
142        self.0.cursor.increment_head(len);
143
144        len
145    }
146
147    /// Releases `len` entries back to the sender
148    #[inline]
149    pub fn release(&mut self, len: usize) {
150        let (pair, _) = self.0.as_pairs();
151
152        debug_assert!(pair.len() >= len, "cannot release more than was acquired");
153
154        for entry in pair.iter().take(len) {
155            unsafe {
156                // Safety: the state's cursor indicates that each slot in the `iter` contains data
157                let _ = entry.take();
158            }
159        }
160
161        self.0.cursor.increment_head(len);
162    }
163
164    #[inline]
165    pub fn len(&self) -> usize {
166        self.0.cursor.recv_len()
167    }
168
169    #[inline]
170    pub fn is_empty(&self) -> bool {
171        self.0.cursor.is_empty()
172    }
173
174    /// Synchronizes any updates from the sender
175    ///
176    /// This can be useful for when `slice` is called without polling for entries first.
177    #[inline]
178    pub fn sync(&mut self) -> Result<(), super::ClosedError> {
179        self.0.acquire_filled()?;
180        Ok(())
181    }
182}
183
184impl<T> Drop for RecvSlice<'_, T> {
185    #[inline]
186    fn drop(&mut self) {
187        self.0.persist_head(self.1);
188    }
189}
190
191struct Acquire<'a, T> {
192    receiver: &'a mut Receiver<T>,
193}
194
195impl<T> Future for Acquire<'_, T> {
196    type Output = Result<()>;
197
198    #[inline]
199    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
200        match self.receiver.poll_slice(cx) {
201            Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
202            Poll::Pending => Poll::Pending,
203        }
204    }
205}