s2n_quic/connection/
acceptor.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    connection,
6    stream::{BidirectionalStream, PeerStream, ReceiveStream},
7};
8
9macro_rules! impl_accept_api {
10    () => {
11        /// Accepts an incoming [`PeerStream`](`crate::stream::PeerStream`)
12        ///
13        /// The method will return
14        /// - `Ok(Some(stream)))` if a [`PeerStream`](`crate::stream::PeerStream`) was accepted
15        /// - `Ok(None)` if the connection was closed without an error
16        /// - `Err(stream_error)` if no stream could be accepted due to an error
17        ///
18        /// # Examples
19        ///
20        /// ```rust,no_run
21        /// # async fn test() -> s2n_quic::stream::Result<()> {
22        /// #   let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
23        /// #
24        /// while let Some(stream) = acceptor.accept().await? {
25        ///     println!("Stream opened from {:?}", stream.connection().remote_addr());
26        /// }
27        /// #
28        /// #   Ok(())
29        /// # }
30        /// ```
31        #[inline]
32        pub async fn accept(
33            &mut self,
34        ) -> crate::connection::Result<Option<crate::stream::PeerStream>> {
35            futures::future::poll_fn(|cx| self.poll_accept(cx)).await
36        }
37
38        /// Poll for accepting an incoming [`PeerStream`](`crate::stream::PeerStream`)
39        ///
40        /// The method will return
41        /// - `Poll::Ready(Ok(Some(stream)))` if a [`PeerStream`](`crate::stream::PeerStream`) was accepted
42        /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
43        /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
44        /// - `Poll::Pending` if no new [`PeerStream`](`crate::stream::PeerStream`) was accepted by the connection yet.
45        ///   In this case the caller must retry calling [`Self::poll_accept`].
46        ///   For this purpose the method will save the [`core::task::Waker`]
47        ///   which is provided as part of the [`core::task::Context`] parameter, and notify it
48        ///   as soon as retrying the method will yield a different result.
49        #[inline]
50        pub fn poll_accept(
51            &mut self,
52            cx: &mut core::task::Context,
53        ) -> core::task::Poll<crate::connection::Result<Option<crate::stream::PeerStream>>> {
54            use s2n_quic_core::stream::StreamType;
55            use $crate::stream::{BidirectionalStream, ReceiveStream};
56
57            Ok(
58                core::task::ready!(self.0.poll_accept(None, cx))?.map(|stream| {
59                    match stream.id().stream_type() {
60                        StreamType::Unidirectional => ReceiveStream::new(stream.into()).into(),
61                        StreamType::Bidirectional => BidirectionalStream::new(stream).into(),
62                    }
63                }),
64            )
65            .into()
66        }
67
68        impl_accept_bidirectional_api!();
69        impl_accept_receive_api!();
70    };
71}
72
73macro_rules! impl_accept_bidirectional_api {
74    () => {
75        /// Accepts an incoming [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
76        ///
77        /// The method will return
78        /// - `Ok(Some(stream)))` if a [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted
79        /// - `Ok(None)` if the connection was closed without an error
80        /// - `Err(stream_error)` if no stream could be accepted due to an error
81        ///
82        /// # Examples
83        ///
84        /// ```rust,no_run
85        /// # async fn test() -> s2n_quic::stream::Result<()> {
86        /// #   let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
87        /// #
88        /// while let Ok(Some(mut stream)) = acceptor.accept_bidirectional_stream().await {
89        ///     println!("Stream opened from {:?}", stream.connection().remote_addr());
90        /// }
91        /// #
92        /// #   Ok(())
93        /// # }
94        /// ```
95        #[inline]
96        pub async fn accept_bidirectional_stream(
97            &mut self,
98        ) -> $crate::connection::Result<Option<$crate::stream::BidirectionalStream>> {
99            futures::future::poll_fn(|cx| self.poll_accept_bidirectional_stream(cx)).await
100        }
101
102        /// Poll for accepting an incoming [`BidirectionalStream`](`crate::stream::BidirectionalStream`)
103        ///
104        /// The method will return
105        /// - `Poll::Ready(Ok(Some(stream)))` if a [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted
106        /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
107        /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
108        /// - `Poll::Pending` if no new [`BidirectionalStream`](`crate::stream::BidirectionalStream`) was accepted by the connection yet.
109        ///   In this case the caller must retry calling [`Self::poll_accept_bidirectional_stream`].
110        ///   For this purpose the method will save the [`core::task::Waker`]
111        ///   which is provided as part of the [`core::task::Context`] parameter, and notify it
112        ///   as soon as retrying the method will yield a different result.
113        #[inline]
114        pub fn poll_accept_bidirectional_stream(
115            &mut self,
116            cx: &mut core::task::Context,
117        ) -> core::task::Poll<$crate::connection::Result<Option<$crate::stream::BidirectionalStream>>> {
118            Ok(
119                core::task::ready!(self
120                    .0
121                    .poll_accept(Some(s2n_quic_core::stream::StreamType::Bidirectional), cx)
122                )?.map($crate::stream::BidirectionalStream::new)
123            ).into()
124        }
125    };
126}
127
128macro_rules! impl_accept_receive_api {
129    () => {
130        /// Accepts an incoming [`ReceiveStream`](`crate::stream::ReceiveStream`)
131        ///
132        /// The method will return
133        /// - `Ok(Some(stream)))` if a [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted
134        /// - `Ok(None)` if the connection was closed without an error
135        /// - `Err(stream_error)` if no stream could be accepted due to an error
136        ///
137        /// # Examples
138        ///
139        /// ```rust,no_run
140        /// # async fn test() -> s2n_quic::stream::Result<()> {
141        /// #   let mut acceptor: s2n_quic::connection::StreamAcceptor = todo!();
142        /// #
143        /// while let Ok(Some(mut stream)) = acceptor.accept_receive_stream().await {
144        ///     println!("Stream opened from {:?}", stream.connection().remote_addr());
145        /// }
146        /// #
147        /// #   Ok(())
148        /// # }
149        /// ```
150        #[inline]
151        pub async fn accept_receive_stream(
152            &mut self,
153        ) -> $crate::connection::Result<Option<$crate::stream::ReceiveStream>> {
154            futures::future::poll_fn(|cx| self.poll_accept_receive_stream(cx)).await
155        }
156
157        /// Poll for accepting an incoming [`ReceiveStream`](`crate::stream::ReceiveStream`)
158        ///
159        /// The method will return
160        /// - `Poll::Ready(Ok(Some(stream)))` if a [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted
161        /// - `Poll::Ready(Ok(None))` if the connection was closed without an error
162        /// - `Poll::Ready(Err(stream_error))` if no stream could be accepted due to an error
163        /// - `Poll::Pending` if no new [`ReceiveStream`](`crate::stream::ReceiveStream`) was accepted by the connection yet.
164        ///   In this case the caller must retry calling [`Self::poll_accept_receive_stream`].
165        ///   For this purpose the method will save the [`core::task::Waker`]
166        ///   which is provided as part of the [`core::task::Context`] parameter, and notify it
167        ///   as soon as retrying the method will yield a different result.
168        #[inline]
169        pub fn poll_accept_receive_stream(
170            &mut self,
171            cx: &mut core::task::Context,
172        ) -> core::task::Poll<$crate::connection::Result<Option<$crate::stream::ReceiveStream>>> {
173            Ok(core::task::ready!(self
174                .0
175                .poll_accept(Some(s2n_quic_core::stream::StreamType::Unidirectional), cx))?
176            .map(|stream| $crate::stream::ReceiveStream::new(stream.into())))
177            .into()
178        }
179    };
180}
181
182#[derive(Debug)]
183pub struct StreamAcceptor(pub(crate) s2n_quic_transport::connection::Connection);
184
185impl StreamAcceptor {
186    impl_accept_api!();
187
188    /// Splits the [`StreamAcceptor`] into [`BidirectionalStreamAcceptor`] and [`ReceiveStreamAcceptor`] halves
189    ///
190    /// # Examples
191    ///
192    /// ```rust,no_run
193    /// # use bytes::Bytes;
194    /// # async fn test() -> s2n_quic::stream::Result<()> {
195    /// #   let connection: s2n_quic::connection::Connection = todo!();
196    /// #
197    /// let (handle, acceptor) = connection.split();
198    /// let (mut bidi, mut recv) = acceptor.split();
199    ///
200    /// tokio::spawn(async move {
201    ///     while let Ok(Some(mut stream)) = bidi.accept_bidirectional_stream().await {
202    ///       println!("Bidirectional stream opened from {:?}", stream.connection().remote_addr());
203    ///     }
204    /// });
205    ///
206    /// tokio::spawn(async move {
207    ///     while let Ok(Some(mut stream)) = recv.accept_receive_stream().await {
208    ///       println!("Receive stream opened from {:?}", stream.connection().remote_addr());
209    ///     }
210    /// });
211    /// #
212    /// #   Ok(())
213    /// # }
214    /// ```
215    #[inline]
216    pub fn split(self) -> (BidirectionalStreamAcceptor, ReceiveStreamAcceptor) {
217        let bidi = BidirectionalStreamAcceptor(self.0.clone());
218        let recv = ReceiveStreamAcceptor(self.0);
219        (bidi, recv)
220    }
221}
222
223impl futures::stream::Stream for StreamAcceptor {
224    type Item = connection::Result<PeerStream>;
225
226    #[inline]
227    fn poll_next(
228        mut self: core::pin::Pin<&mut Self>,
229        cx: &mut core::task::Context<'_>,
230    ) -> core::task::Poll<Option<Self::Item>> {
231        match core::task::ready!(self.poll_accept(cx)) {
232            Ok(Some(stream)) => Some(Ok(stream)),
233            Ok(None) => None,
234            Err(err) => Some(Err(err)),
235        }
236        .into()
237    }
238}
239
240#[derive(Debug)]
241pub struct BidirectionalStreamAcceptor(s2n_quic_transport::connection::Connection);
242
243impl BidirectionalStreamAcceptor {
244    impl_accept_bidirectional_api!();
245}
246
247impl futures::stream::Stream for BidirectionalStreamAcceptor {
248    type Item = connection::Result<BidirectionalStream>;
249
250    #[inline]
251    fn poll_next(
252        mut self: core::pin::Pin<&mut Self>,
253        cx: &mut core::task::Context<'_>,
254    ) -> core::task::Poll<Option<Self::Item>> {
255        match core::task::ready!(self.poll_accept_bidirectional_stream(cx)) {
256            Ok(Some(stream)) => Some(Ok(stream)),
257            Ok(None) => None,
258            Err(err) => Some(Err(err)),
259        }
260        .into()
261    }
262}
263
264#[derive(Debug)]
265pub struct ReceiveStreamAcceptor(s2n_quic_transport::connection::Connection);
266
267impl ReceiveStreamAcceptor {
268    impl_accept_receive_api!();
269}
270
271impl futures::stream::Stream for ReceiveStreamAcceptor {
272    type Item = connection::Result<ReceiveStream>;
273
274    #[inline]
275    fn poll_next(
276        mut self: core::pin::Pin<&mut Self>,
277        cx: &mut core::task::Context<'_>,
278    ) -> core::task::Poll<Option<Self::Item>> {
279        match core::task::ready!(self.poll_accept_receive_stream(cx)) {
280            Ok(Some(stream)) => Some(Ok(stream)),
281            Ok(None) => None,
282            Err(err) => Some(Err(err)),
283        }
284        .into()
285    }
286}