s2n_quic/connection/acceptor.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5 connection,
6 stream::{BidirectionalStream, PeerStream, ReceiveStream},
7};
8
9macro_rules! impl_accept_api {
10 () => {
11 /// Accepts an incoming [`PeerStream`](`crate::stream::PeerStream`)
12 ///
13 /// The method will return
14 /// - `Ok(Some(stream)))` if a [`PeerStream`](`crate::stream::PeerStream`) was accepted
15 /// - `Ok(None)` if the connection was closed without an error
16 /// - `Err(stream_error)` if no stream could be accepted due to an error
17 ///
18 /// # Examples
19 ///
20 /// ```rust,no_run
21 /// # async fn test() -> s2n_quic::stream::Result<()> {
22 /// # let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
23 /// #
24 /// while let Some(stream) = acceptor.accept().await? {
25 /// println!("Stream opened from {:?}", stream.connection().remote_addr());
26 /// }
27 /// #
28 /// # Ok(())
29 /// # }
30 /// ```
31 #[inline]
32 pub async fn accept(
33 &mut self,
34 ) -> crate::connection::Result<Option<crate::stream::PeerStream>> {
35 futures::future::poll_fn(|cx| self.poll_accept(cx)).await
36 }
37
38 /// Poll for accepting an incoming [`PeerStream`](`crate::stream::PeerStream`)
39 ///
40 /// The method will return
41 /// - `Poll::Ready(Ok(Some(stream)))` if a [`PeerStream`](`crate::stream::PeerStream`) was accepted
42 /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
43 /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
44 /// - `Poll::Pending` if no new [`PeerStream`](`crate::stream::PeerStream`) was accepted by the connection yet.
45 /// In this case the caller must retry calling [`Self::poll_accept`].
46 /// For this purpose the method will save the [`core::task::Waker`]
47 /// which is provided as part of the [`core::task::Context`] parameter, and notify it
48 /// as soon as retrying the method will yield a different result.
49 #[inline]
50 pub fn poll_accept(
51 &mut self,
52 cx: &mut core::task::Context,
53 ) -> core::task::Poll<crate::connection::Result<Option<crate::stream::PeerStream>>> {
54 use s2n_quic_core::stream::StreamType;
55 use $crate::stream::{BidirectionalStream, ReceiveStream};
56
57 Ok(
58 core::task::ready!(self.0.poll_accept(None, cx))?.map(|stream| {
59 match stream.id().stream_type() {
60 StreamType::Unidirectional => ReceiveStream::new(stream.into()).into(),
61 StreamType::Bidirectional => BidirectionalStream::new(stream).into(),
62 }
63 }),
64 )
65 .into()
66 }
67
68 impl_accept_bidirectional_api!();
69 impl_accept_receive_api!();
70 };
71}
72
73macro_rules! impl_accept_bidirectional_api {
74 () => {
75 /// Accepts an incoming [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
76 ///
77 /// The method will return
78 /// - `Ok(Some(stream)))` if a [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted
79 /// - `Ok(None)` if the connection was closed without an error
80 /// - `Err(stream_error)` if no stream could be accepted due to an error
81 ///
82 /// # Examples
83 ///
84 /// ```rust,no_run
85 /// # async fn test() -> s2n_quic::stream::Result<()> {
86 /// # let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
87 /// #
88 /// while let Ok(Some(mut stream)) = acceptor.accept_bidirectional_stream().await {
89 /// println!("Stream opened from {:?}", stream.connection().remote_addr());
90 /// }
91 /// #
92 /// # Ok(())
93 /// # }
94 /// ```
95 #[inline]
96 pub async fn accept_bidirectional_stream(
97 &mut self,
98 ) -> $crate::connection::Result<Option<$crate::stream::BidirectionalStream>> {
99 futures::future::poll_fn(|cx| self.poll_accept_bidirectional_stream(cx)).await
100 }
101
102 /// Poll for accepting an incoming [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
103 ///
104 /// The method will return
105 /// - `Poll::Ready(Ok(Some(stream)))` if a [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted
106 /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
107 /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
108 /// - `Poll::Pending` if no new [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted by the connection yet.
109 /// In this case the caller must retry calling [`Self::poll_accept_bidirectional_stream`].
110 /// For this purpose the method will save the [`core::task::Waker`]
111 /// which is provided as part of the [`core::task::Context`] parameter, and notify it
112 /// as soon as retrying the method will yield a different result.
113 #[inline]
114 pub fn poll_accept_bidirectional_stream(
115 &mut self,
116 cx: &mut core::task::Context,
117 ) -> core::task::Poll<$crate::connection::Result<Option<$crate::stream::BidirectionalStream>>> {
118 Ok(
119 core::task::ready!(self
120 .0
121 .poll_accept(Some(s2n_quic_core::stream::StreamType::Bidirectional), cx)
122 )?.map($crate::stream::BidirectionalStream::new)
123 ).into()
124 }
125 };
126}
127
128macro_rules! impl_accept_receive_api {
129 () => {
130 /// Accepts an incoming [`ReceiveStream`](`crate::stream::ReceiveStream`)
131 ///
132 /// The method will return
133 /// - `Ok(Some(stream)))` if a [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted
134 /// - `Ok(None)` if the connection was closed without an error
135 /// - `Err(stream_error)` if no stream could be accepted due to an error
136 ///
137 /// # Examples
138 ///
139 /// ```rust,no_run
140 /// # async fn test() -> s2n_quic::stream::Result<()> {
141 /// # let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
142 /// #
143 /// while let Ok(Some(mut stream)) = acceptor.accept_receive_stream().await {
144 /// println!("Stream opened from {:?}", stream.connection().remote_addr());
145 /// }
146 /// #
147 /// # Ok(())
148 /// # }
149 /// ```
150 #[inline]
151 pub async fn accept_receive_stream(
152 &mut self,
153 ) -> $crate::connection::Result<Option<$crate::stream::ReceiveStream>> {
154 futures::future::poll_fn(|cx| self.poll_accept_receive_stream(cx)).await
155 }
156
157 /// Poll for accepting an incoming [`ReceiveStream`](`crate::stream::ReceiveStream`)
158 ///
159 /// The method will return
160 /// - `Poll::Ready(Ok(Some(stream)))` if a [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted
161 /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
162 /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
163 /// - `Poll::Pending` if no new [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted by the connection yet.
164 /// In this case the caller must retry calling [`Self::poll_accept_receive_stream`].
165 /// For this purpose the method will save the [`core::task::Waker`]
166 /// which is provided as part of the [`core::task::Context`] parameter, and notify it
167 /// as soon as retrying the method will yield a different result.
168 #[inline]
169 pub fn poll_accept_receive_stream(
170 &mut self,
171 cx: &mut core::task::Context,
172 ) -> core::task::Poll<$crate::connection::Result<Option<$crate::stream::ReceiveStream>>> {
173 Ok(core::task::ready!(self
174 .0
175 .poll_accept(Some(s2n_quic_core::stream::StreamType::Unidirectional), cx))?
176 .map(|stream| $crate::stream::ReceiveStream::new(stream.into())))
177 .into()
178 }
179 };
180}
181
182#[derive(Debug)]
183pub struct StreamAcceptor(pub(crate) s2n_quic_transport::connection::Connection);
184
185impl StreamAcceptor {
186 impl_accept_api!();
187
188 /// Splits the [`StreamAcceptor`] into [`BidirectionalStreamAcceptor`] and [`ReceiveStreamAcceptor`] halves
189 ///
190 /// # Examples
191 ///
192 /// ```rust,no_run
193 /// # use bytes::Bytes;
194 /// # async fn test() -> s2n_quic::stream::Result<()> {
195 /// # let connection: s2n_quic::connection::Connection = todo!();
196 /// #
197 /// let (handle, acceptor) = connection.split();
198 /// let (mut bidi, mut recv) = acceptor.split();
199 ///
200 /// tokio::spawn(async move {
201 /// while let Ok(Some(mut stream)) = bidi.accept_bidirectional_stream().await {
202 /// println!("Bidirectional stream opened from {:?}", stream.connection().remote_addr());
203 /// }
204 /// });
205 ///
206 /// tokio::spawn(async move {
207 /// while let Ok(Some(mut stream)) = recv.accept_receive_stream().await {
208 /// println!("Receive stream opened from {:?}", stream.connection().remote_addr());
209 /// }
210 /// });
211 /// #
212 /// # Ok(())
213 /// # }
214 /// ```
215 #[inline]
216 pub fn split(self) -> (BidirectionalStreamAcceptor, ReceiveStreamAcceptor) {
217 let bidi = BidirectionalStreamAcceptor(self.0.clone());
218 let recv = ReceiveStreamAcceptor(self.0);
219 (bidi, recv)
220 }
221}
222
223impl futures::stream::Stream for StreamAcceptor {
224 type Item = connection::Result<PeerStream>;
225
226 #[inline]
227 fn poll_next(
228 mut self: core::pin::Pin<&mut Self>,
229 cx: &mut core::task::Context<'_>,
230 ) -> core::task::Poll<Option<Self::Item>> {
231 match core::task::ready!(self.poll_accept(cx)) {
232 Ok(Some(stream)) => Some(Ok(stream)),
233 Ok(None) => None,
234 Err(err) => Some(Err(err)),
235 }
236 .into()
237 }
238}
239
240#[derive(Debug)]
241pub struct BidirectionalStreamAcceptor(s2n_quic_transport::connection::Connection);
242
243impl BidirectionalStreamAcceptor {
244 impl_accept_bidirectional_api!();
245}
246
247impl futures::stream::Stream for BidirectionalStreamAcceptor {
248 type Item = connection::Result<BidirectionalStream>;
249
250 #[inline]
251 fn poll_next(
252 mut self: core::pin::Pin<&mut Self>,
253 cx: &mut core::task::Context<'_>,
254 ) -> core::task::Poll<Option<Self::Item>> {
255 match core::task::ready!(self.poll_accept_bidirectional_stream(cx)) {
256 Ok(Some(stream)) => Some(Ok(stream)),
257 Ok(None) => None,
258 Err(err) => Some(Err(err)),
259 }
260 .into()
261 }
262}
263
264#[derive(Debug)]
265pub struct ReceiveStreamAcceptor(s2n_quic_transport::connection::Connection);
266
267impl ReceiveStreamAcceptor {
268 impl_accept_receive_api!();
269}
270
271impl futures::stream::Stream for ReceiveStreamAcceptor {
272 type Item = connection::Result<ReceiveStream>;
273
274 #[inline]
275 fn poll_next(
276 mut self: core::pin::Pin<&mut Self>,
277 cx: &mut core::task::Context<'_>,
278 ) -> core::task::Poll<Option<Self::Item>> {
279 match core::task::ready!(self.poll_accept_receive_stream(cx)) {
280 Ok(Some(stream)) => Some(Ok(stream)),
281 Ok(None) => None,
282 Err(err) => Some(Err(err)),
283 }
284 .into()
285 }
286}