s2n_quic_core/sync/spsc/
send.rs1use super::{state::Side, Cursor, PushError, Result, State};
5use core::{
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11#[derive(Debug)]
12pub struct Sender<T>(pub(super) State<T>);
13
14impl<T> Sender<T> {
15 #[inline]
16 pub fn capacity(&self) -> usize {
17 self.0.cursor.capacity()
18 }
19
20 #[inline]
24 pub fn slice(&mut self) -> SendSlice<'_, T> {
25 let cursor = self.0.cursor;
26 SendSlice(&mut self.0, cursor)
27 }
28
29 #[inline]
31 pub async fn acquire(&mut self) -> Result<()> {
32 Acquire { sender: self }.await
33 }
34
35 #[inline]
36 pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<SendSlice<'_, T>>> {
37 macro_rules! acquire_capacity {
38 () => {
39 match self.0.acquire_capacity() {
40 Ok(true) => {
41 let cursor = self.0.cursor;
42 return Ok(SendSlice(&mut self.0, cursor)).into();
43 }
44 Ok(false) => {
45 }
47 Err(err) => {
48 return Err(err).into();
50 }
51 }
52 };
53 }
54
55 acquire_capacity!();
57
58 self.0.sender.register(cx.waker());
60
61 acquire_capacity!();
63
64 Poll::Pending
65 }
66
67 #[inline]
68 pub fn try_slice(&mut self) -> Result<Option<SendSlice<'_, T>>> {
69 Ok(if self.0.acquire_capacity()? {
70 let cursor = self.0.cursor;
71 Some(SendSlice(&mut self.0, cursor))
72 } else {
73 None
74 })
75 }
76}
77
78impl<T> Drop for Sender<T> {
79 #[inline]
80 fn drop(&mut self) {
81 self.0.close(Side::Sender);
82 }
83}
84
85#[derive(Debug)]
86pub struct SendSlice<'a, T>(&'a mut State<T>, Cursor);
87
88impl<T> SendSlice<'_, T> {
89 #[inline]
90 pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
91 if self.0.cursor.is_full() && !self.0.acquire_capacity()? {
92 return Err(PushError::Full(value));
93 }
94
95 let (_, pair) = self.0.as_pairs();
96
97 unsafe {
98 pair.write(0, value);
101 }
102
103 self.0.cursor.increment_tail(1);
104
105 Ok(())
106 }
107
108 pub fn extend<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> Result<()> {
109 if self.0.acquire_capacity()? {
110 let (_, pair) = self.0.as_pairs();
111
112 let mut idx = 0;
113 let capacity = self.capacity();
114
115 while idx < capacity {
116 if let Some(value) = iter.next() {
117 unsafe {
118 pair.write(idx, value);
120 }
121 idx += 1;
122 } else {
123 break;
124 }
125 }
126
127 self.0.cursor.increment_tail(idx);
128 }
129
130 Ok(())
131 }
132
133 #[inline]
134 pub fn capacity(&self) -> usize {
135 self.0.cursor.send_capacity()
136 }
137
138 #[inline]
142 pub fn sync(&mut self) -> Result<(), super::ClosedError> {
143 self.0.acquire_capacity()?;
144 Ok(())
145 }
146}
147
148impl<T> Drop for SendSlice<'_, T> {
149 #[inline]
150 fn drop(&mut self) {
151 self.0.persist_tail(self.1);
152 }
153}
154
155struct Acquire<'a, T> {
156 sender: &'a mut Sender<T>,
157}
158
159impl<T> Future for Acquire<'_, T> {
160 type Output = Result<()>;
161
162 #[inline]
163 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
164 match self.sender.poll_slice(cx) {
165 Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
166 Poll::Pending => Poll::Pending,
167 }
168 }
169}