s2n_quic_core/sync/spsc/
send.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{state::Side, Cursor, PushError, Result, State};
5use core::{
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11#[derive(Debug)]
12pub struct Sender<T>(pub(super) State<T>);
13
14impl<T> Sender<T> {
15    #[inline]
16    pub fn capacity(&self) -> usize {
17        self.0.cursor.capacity()
18    }
19
20    /// Returns the currently acquired slice of entries for the sender
21    ///
22    /// Callers should call [`Self::acquire`] or [`Self::poll_slice`] before calling this method.
23    #[inline]
24    pub fn slice(&mut self) -> SendSlice<'_, T> {
25        let cursor = self.0.cursor;
26        SendSlice(&mut self.0, cursor)
27    }
28
29    /// Blocks until at least one entry is available for sending
30    #[inline]
31    pub async fn acquire(&mut self) -> Result<()> {
32        Acquire { sender: self }.await
33    }
34
35    #[inline]
36    pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<SendSlice<'_, T>>> {
37        macro_rules! acquire_capacity {
38            () => {
39                match self.0.acquire_capacity() {
40                    Ok(true) => {
41                        let cursor = self.0.cursor;
42                        return Ok(SendSlice(&mut self.0, cursor)).into();
43                    }
44                    Ok(false) => {
45                        // the queue is full
46                    }
47                    Err(err) => {
48                        // the channel was closed
49                        return Err(err).into();
50                    }
51                }
52            };
53        }
54
55        // check capacity before registering a waker
56        acquire_capacity!();
57
58        // register the waker
59        self.0.sender.register(cx.waker());
60
61        // check once more to avoid a loss of notification
62        acquire_capacity!();
63
64        Poll::Pending
65    }
66
67    #[inline]
68    pub fn try_slice(&mut self) -> Result<Option<SendSlice<'_, T>>> {
69        Ok(if self.0.acquire_capacity()? {
70            let cursor = self.0.cursor;
71            Some(SendSlice(&mut self.0, cursor))
72        } else {
73            None
74        })
75    }
76}
77
78impl<T> Drop for Sender<T> {
79    #[inline]
80    fn drop(&mut self) {
81        self.0.close(Side::Sender);
82    }
83}
84
85#[derive(Debug)]
86pub struct SendSlice<'a, T>(&'a mut State<T>, Cursor);
87
88impl<T> SendSlice<'_, T> {
89    #[inline]
90    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
91        if self.0.cursor.is_full() && !self.0.acquire_capacity()? {
92            return Err(PushError::Full(value));
93        }
94
95        let (_, pair) = self.0.as_pairs();
96
97        unsafe {
98            // Safety: the second pair of slices contains uninitialized memory and the cursor
99            // indicates we have capacity to write at least one value
100            pair.write(0, value);
101        }
102
103        self.0.cursor.increment_tail(1);
104
105        Ok(())
106    }
107
108    pub fn extend<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> Result<()> {
109        if self.0.acquire_capacity()? {
110            let (_, pair) = self.0.as_pairs();
111
112            let mut idx = 0;
113            let capacity = self.capacity();
114
115            while idx < capacity {
116                if let Some(value) = iter.next() {
117                    unsafe {
118                        // Safety: the second pair of slices contains uninitialized memory
119                        pair.write(idx, value);
120                    }
121                    idx += 1;
122                } else {
123                    break;
124                }
125            }
126
127            self.0.cursor.increment_tail(idx);
128        }
129
130        Ok(())
131    }
132
133    #[inline]
134    pub fn capacity(&self) -> usize {
135        self.0.cursor.send_capacity()
136    }
137
138    /// Synchronizes any updates from the receiver
139    ///
140    /// This can be useful for when `slice` is called without polling for entries first.
141    #[inline]
142    pub fn sync(&mut self) -> Result<(), super::ClosedError> {
143        self.0.acquire_capacity()?;
144        Ok(())
145    }
146}
147
148impl<T> Drop for SendSlice<'_, T> {
149    #[inline]
150    fn drop(&mut self) {
151        self.0.persist_tail(self.1);
152    }
153}
154
155struct Acquire<'a, T> {
156    sender: &'a mut Sender<T>,
157}
158
159impl<T> Future for Acquire<'_, T> {
160    type Output = Result<()>;
161
162    #[inline]
163    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
164        match self.sender.poll_slice(cx) {
165            Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
166            Poll::Pending => Poll::Pending,
167        }
168    }
169}