s2n_quic/stream/
send.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use s2n_quic_transport::stream;
5
6/// A QUIC stream that is only allowed to send data.
7#[derive(Debug)]
8pub struct SendStream(stream::SendStream);
9
10macro_rules! impl_send_stream_api {
11    (| $stream:ident, $dispatch:ident | $dispatch_body:expr) => {
12        /// Enqueues a chunk of data for sending it towards the peer.
13        ///
14        /// # Return value
15        ///
16        /// The function returns:
17        ///
18        /// - `Ok(())` if the data was enqueued for sending.
19        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
20        ///
21        /// # Examples
22        ///
23        /// ```rust,no_run
24        /// # async fn test() -> s2n_quic::stream::Result<()> {
25        /// #   let stream: s2n_quic::stream::SendStream = todo!();
26        /// #
27        /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
28        /// stream.send(data).await?;
29        /// #
30        /// #   Ok(())
31        /// # }
32        /// ```
33        #[inline]
34        pub async fn send(&mut self, mut data: bytes::Bytes) -> $crate::stream::Result<()> {
35            ::futures::future::poll_fn(|cx| self.poll_send(&mut data, cx)).await
36        }
37
38        /// Enqueues a chunk of data for sending it towards the peer.
39        ///
40        /// # Return value
41        ///
42        /// The function returns:
43        ///
44        /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
45        ///   the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
46        ///   [`Context`](core::task::Context) is notified.
47        /// - `Poll::Ready(Ok(()))` if the data was enqueued for sending. The provided `chunk` will
48        ///   be replaced with an empty [`Bytes`](bytes::Bytes).
49        /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
50        #[inline]
51        pub fn poll_send(
52            &mut self,
53            chunk: &mut bytes::Bytes,
54            cx: &mut core::task::Context,
55        ) -> core::task::Poll<$crate::stream::Result<()>> {
56            macro_rules! $dispatch {
57                () => {
58                    Err($crate::stream::Error::non_writable()).into()
59                };
60                ($variant: expr) => {
61                    s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
62                        $variant.poll_send(chunk, cx)
63                    })
64                };
65            }
66
67            let $stream = self;
68            $dispatch_body
69        }
70
71        /// Enqueues a slice of chunks of data for sending it towards the peer.
72        ///
73        /// # Return value
74        ///
75        /// The function returns:
76        ///
77        /// - `Ok(())` if all of the chunks of data were enqueued for sending. Each of the
78        ///   consumed [`Bytes`](bytes::Bytes) will be replaced with an empty [`Bytes`](bytes::Bytes).
79        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
80        ///
81        /// # Examples
82        ///
83        /// ```rust,no_run
84        /// # async fn test() -> s2n_quic::stream::Result<()> {
85        /// #   let stream: s2n_quic::stream::SendStream = todo!();
86        /// #
87        /// let mut data1 = bytes::Bytes::from_static(&[1, 2, 3]);
88        /// let mut data2 = bytes::Bytes::from_static(&[4, 5, 6]);
89        /// let mut data3 = bytes::Bytes::from_static(&[7, 8, 9]);
90        /// let chunks = [data1, data2, data3];
91        /// stream.send_vectored(&mut chunks).await?;
92        /// #
93        /// #   Ok(())
94        /// # }
95        /// ```
96        #[inline]
97        pub async fn send_vectored(
98            &mut self,
99            chunks: &mut [bytes::Bytes],
100        ) -> $crate::stream::Result<()> {
101            let mut sent_chunks = 0;
102
103            ::futures::future::poll_fn(|cx| {
104                sent_chunks +=
105                    ::core::task::ready!(self.poll_send_vectored(&mut chunks[sent_chunks..], cx))?;
106                if sent_chunks == chunks.len() {
107                    return Ok(()).into();
108                }
109                core::task::Poll::Pending
110            })
111            .await
112        }
113
114        /// Polls enqueueing a slice of chunks of data for sending it towards the peer.
115        ///
116        /// # Return value
117        ///
118        /// The function returns:
119        ///
120        /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
121        ///   the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
122        ///   [`Context`](core::task::Context) is notified.
123        /// - `Poll::Ready(Ok(count))` if one or more chunks of data were enqueued for sending. Any of the
124        ///   consumed [`Bytes`](bytes::Bytes) will be replaced with an empty [`Bytes`](bytes::Bytes).
125        ///   If `count` does not equal the total number of chunks, the stream will store the
126        ///   [Waker](core::task::Waker) and notify the task once more capacity is available.
127        /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
128        #[inline]
129        pub fn poll_send_vectored(
130            &mut self,
131            chunks: &mut [bytes::Bytes],
132            cx: &mut core::task::Context,
133        ) -> core::task::Poll<$crate::stream::Result<usize>> {
134            macro_rules! $dispatch {
135                () => {
136                    Err($crate::stream::Error::non_writable()).into()
137                };
138                ($variant: expr) => {
139                    s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
140                        $variant.poll_send_vectored(chunks, cx)
141                    })
142                };
143            }
144
145            let $stream = self;
146            $dispatch_body
147        }
148
149        /// Polls send readiness for the given stream.
150        ///
151        /// This method _must_ be called before calling [`send_data`](Self::send_data).
152        ///
153        /// # Return value
154        ///
155        /// The function returns:
156        /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
157        ///   the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
158        ///   [`Context`](core::task::Context) is notified.
159        /// - `Poll::Ready(Ok(available_bytes))` if the stream is ready to send data, where
160        ///   `available_bytes` is how many bytes the stream can currently accept.
161        /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
162        #[inline]
163        pub fn poll_send_ready(
164            &mut self,
165            cx: &mut core::task::Context,
166        ) -> core::task::Poll<$crate::stream::Result<usize>> {
167            macro_rules! $dispatch {
168                () => {
169                    Err($crate::stream::Error::non_writable()).into()
170                };
171                ($variant: expr) => {
172                    s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
173                        $variant.poll_send_ready(cx)
174                    })
175                };
176            }
177
178            let $stream = self;
179            $dispatch_body
180        }
181
182        /// Sends data on the stream without blocking the task.
183        ///
184        /// [`poll_send_ready`](Self::poll_send_ready) _must_ be called before calling this method.
185        ///
186        /// # Return value
187        ///
188        /// The function returns:
189        /// - `Ok(())` if the data was enqueued for sending.
190        /// - `Err(SendingBlocked)` if the stream did not have enough capacity to enqueue the
191        ///   `chunk`.
192        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
193        #[inline]
194        pub fn send_data(&mut self, chunk: bytes::Bytes) -> $crate::stream::Result<()> {
195            macro_rules! $dispatch {
196                () => {
197                    Err($crate::stream::Error::non_writable())
198                };
199                ($variant: expr) => {
200                    $variant.send_data(chunk)
201                };
202            }
203
204            let $stream = self;
205            $dispatch_body
206        }
207
208        /// Flushes the stream and waits for the peer to receive all outstanding data.
209        ///
210        /// # Return value
211        ///
212        /// The function returns:
213        /// - `Ok(())` if the send buffer was completely flushed and acknowledged by
214        ///   the peer.
215        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
216        ///
217        /// # Note
218        ///
219        /// Using this function can potentially introduce additional latency, as it will only
220        /// return after all outstanding data is acknowledged by the peer. For this reason, the
221        /// `AsyncWrite` trait implementations do not use this functionality and are merely no-ops.
222        ///
223        /// # Examples
224        ///
225        /// ```rust,no_run
226        /// # async fn test() -> s2n_quic::stream::Result<()> {
227        /// #   let stream: s2n_quic::stream::SendStream = todo!();
228        /// #
229        /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
230        /// stream.send(data).await?;
231        /// stream.flush().await?;
232        /// // at this point, the peer has received all of the `data`
233        /// #
234        /// #   Ok(())
235        /// # }
236        /// ```
237        #[inline]
238        pub async fn flush(&mut self) -> $crate::stream::Result<()> {
239            ::futures::future::poll_fn(|cx| self.poll_flush(cx)).await
240        }
241
242        /// Polls flushing the stream and waits for the peer to receive all outstanding data.
243        ///
244        /// # Return value
245        ///
246        /// The function returns:
247        /// - `Poll::Pending` if the stream's send buffer is still being sent. In this case,
248        ///   the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
249        ///   [`Context`](core::task::Context) is notified.
250        /// - `Poll::Ready(Ok(()))` if the send buffer was completely flushed and acknowledged by
251        ///   the peer.
252        /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
253        #[inline]
254        pub fn poll_flush(
255            &mut self,
256            cx: &mut core::task::Context,
257        ) -> core::task::Poll<$crate::stream::Result<()>> {
258            macro_rules! $dispatch {
259                () => {
260                    Err($crate::stream::Error::non_writable()).into()
261                };
262                ($variant: expr) => {
263                    s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
264                        $variant.poll_flush(cx)
265                    })
266                };
267            }
268
269            let $stream = self;
270            $dispatch_body
271        }
272
273        /// Marks the stream as finished.
274        ///
275        /// This method returns immediately without notifying the caller that all of the outstanding
276        /// data has been received by the peer. An application wanting to both [`finish`](Self::finish)
277        /// and [`flush`](Self::flush) the outstanding data can use [`close`](Self::close) to accomplish
278        /// this.
279        ///
280        /// __NOTE__: This method will be called when the [`stream`](Self) is dropped.
281        ///
282        /// # Return value
283        ///
284        /// The function returns:
285        /// - `Ok(())` if the stream was finished successfully.
286        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
287        #[inline]
288        pub fn finish(&mut self) -> $crate::stream::Result<()> {
289            macro_rules! $dispatch {
290                () => {
291                    Err($crate::stream::Error::non_writable()).into()
292                };
293                ($variant: expr) => {
294                    $variant.finish()
295                };
296            }
297
298            let $stream = self;
299            $dispatch_body
300        }
301
302        /// Marks the stream as finished and waits for all outstanding data to be acknowledged.
303        ///
304        /// This method is equivalent to calling [`finish`](Self::finish) and [`flush`](Self::flush).
305        ///
306        /// # Return value
307        ///
308        /// The function returns:
309        /// - `Ok(())` if the send buffer was completely flushed and acknowledged by
310        ///   the peer.
311        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
312        ///
313        /// # Examples
314        ///
315        /// ```rust,no_run
316        /// # async fn test() -> s2n_quic::stream::Result<()> {
317        /// #   let stream: s2n_quic::stream::SendStream = todo!();
318        /// #
319        /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
320        /// stream.send(data).await?;
321        /// stream.close().await?;
322        /// // at this point, the peer has received all of the `data` and has acknowledged the
323        /// // stream being finished.
324        /// #
325        /// #   Ok(())
326        /// # }
327        /// ```
328        #[inline]
329        pub async fn close(&mut self) -> $crate::stream::Result<()> {
330            ::futures::future::poll_fn(|cx| self.poll_close(cx)).await
331        }
332
333        /// Marks the stream as finished and polls for all outstanding data to be acknowledged.
334        ///
335        /// This method is equivalent to calling [`finish`](Self::finish) and [`flush`](Self::flush).
336        ///
337        /// # Return value
338        ///
339        /// The function returns:
340        /// - `Poll::Pending` if the stream's send buffer is still being sent. In this case,
341        ///   the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
342        ///   [`Context`](core::task::Context) is notified.
343        /// - `Poll::Ready(Ok(()))` if the send buffer was completely flushed and acknowledged by
344        ///   the peer.
345        /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
346        #[inline]
347        pub fn poll_close(
348            &mut self,
349            cx: &mut core::task::Context,
350        ) -> core::task::Poll<$crate::stream::Result<()>> {
351            macro_rules! $dispatch {
352                () => {
353                    Err($crate::stream::Error::non_writable()).into()
354                };
355                ($variant: expr) => {
356                    s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
357                        $variant.poll_close(cx)
358                    })
359                };
360            }
361
362            let $stream = self;
363            $dispatch_body
364        }
365
366        /// Closes the stream with an [error code](crate::application::Error).
367        ///
368        /// After calling this, the stream is closed and will not accept any additional data to be
369        /// sent to the peer. The peer will also be notified of the [error
370        /// code](crate::application::Error).
371        ///
372        /// # Return value
373        ///
374        /// The function returns:
375        /// - `Ok(())` if the stream was reset successfully.
376        /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error). The
377        ///   stream may have been reset previously, or the connection itself was closed.
378        #[inline]
379        pub fn reset(
380            &mut self,
381            error_code: $crate::application::Error,
382        ) -> $crate::stream::Result<()> {
383            macro_rules! $dispatch {
384                () => {
385                    Err($crate::stream::Error::non_writable())
386                };
387                ($variant: expr) => {
388                    $variant.reset(error_code)
389                };
390            }
391
392            let $stream = self;
393            $dispatch_body
394        }
395    };
396}
397
398macro_rules! impl_send_stream_trait {
399    ($name:ident, | $stream:ident, $dispatch:ident | $dispatch_body:expr) => {
400        impl futures::sink::Sink<bytes::Bytes> for $name {
401            type Error = $crate::stream::Error;
402
403            #[inline]
404            fn poll_ready(
405                mut self: core::pin::Pin<&mut Self>,
406                cx: &mut core::task::Context<'_>,
407            ) -> core::task::Poll<$crate::stream::Result<()>> {
408                core::task::ready!(self.poll_send_ready(cx))?;
409                Ok(()).into()
410            }
411
412            #[inline]
413            fn start_send(
414                mut self: core::pin::Pin<&mut Self>,
415                data: bytes::Bytes,
416            ) -> $crate::stream::Result<()> {
417                self.send_data(data)
418            }
419
420            #[inline]
421            fn poll_flush(
422                self: core::pin::Pin<&mut Self>,
423                _cx: &mut core::task::Context<'_>,
424            ) -> core::task::Poll<$crate::stream::Result<()>> {
425                // no-op - this contract relies on flushing intermediate buffers, not waiting for
426                // the peer to ACK data
427                core::task::Poll::Ready(Ok(()))
428            }
429
430            #[inline]
431            fn poll_close(
432                mut self: core::pin::Pin<&mut Self>,
433                cx: &mut core::task::Context<'_>,
434            ) -> core::task::Poll<$crate::stream::Result<()>> {
435                Self::poll_close(&mut self, cx)
436            }
437        }
438
439        impl futures::io::AsyncWrite for $name {
440            fn poll_write(
441                mut self: core::pin::Pin<&mut Self>,
442                cx: &mut core::task::Context<'_>,
443                buf: &[u8],
444            ) -> core::task::Poll<std::io::Result<usize>> {
445                if buf.is_empty() {
446                    return Ok(0).into();
447                }
448
449                let len = core::task::ready!(self.poll_send_ready(cx))?.min(buf.len());
450                let data = bytes::Bytes::copy_from_slice(&buf[..len]);
451                self.send_data(data)?;
452                Ok(len).into()
453            }
454
455            fn poll_write_vectored(
456                mut self: core::pin::Pin<&mut Self>,
457                cx: &mut core::task::Context<'_>,
458                bufs: &[futures::io::IoSlice],
459            ) -> core::task::Poll<std::io::Result<usize>> {
460                if bufs.is_empty() {
461                    return Ok(0).into();
462                }
463
464                let len = core::task::ready!(self.poll_send_ready(cx))?;
465                let capacity = bufs.iter().map(|buf| buf.len()).sum();
466                let len = len.min(capacity);
467
468                let mut data = bytes::BytesMut::with_capacity(len);
469                for buf in bufs {
470                    // only copy what the window will allow
471                    let to_copy = buf.len().min(len - data.len());
472                    data.extend_from_slice(&buf[..to_copy]);
473
474                    // we're done filling the buffer
475                    if data.len() == len {
476                        break;
477                    }
478                }
479
480                self.send_data(data.freeze())?;
481
482                Ok(len).into()
483            }
484
485            #[inline]
486            fn poll_flush(
487                self: core::pin::Pin<&mut Self>,
488                _cx: &mut core::task::Context<'_>,
489            ) -> core::task::Poll<std::io::Result<()>> {
490                // no-op - this contract relies on flushing intermediate buffers, not waiting for
491                // the peer to ACK data
492                core::task::Poll::Ready(Ok(()))
493            }
494
495            #[inline]
496            fn poll_close(
497                mut self: core::pin::Pin<&mut Self>,
498                cx: &mut core::task::Context<'_>,
499            ) -> core::task::Poll<std::io::Result<()>> {
500                core::task::ready!($name::poll_close(&mut self, cx))?;
501                Ok(()).into()
502            }
503        }
504
505        impl tokio::io::AsyncWrite for $name {
506            #[inline]
507            fn poll_write(
508                self: core::pin::Pin<&mut Self>,
509                cx: &mut core::task::Context<'_>,
510                buf: &[u8],
511            ) -> core::task::Poll<std::io::Result<usize>> {
512                futures::io::AsyncWrite::poll_write(self, cx, buf)
513            }
514
515            fn poll_write_vectored(
516                mut self: core::pin::Pin<&mut Self>,
517                cx: &mut core::task::Context<'_>,
518                bufs: &[std::io::IoSlice],
519            ) -> core::task::Poll<std::io::Result<usize>> {
520                if bufs.is_empty() {
521                    return Ok(0).into();
522                }
523
524                let len = core::task::ready!(self.poll_send_ready(cx))?;
525                let capacity = bufs.iter().map(|buf| buf.len()).sum();
526                let len = len.min(capacity);
527
528                let mut data = bytes::BytesMut::with_capacity(len);
529                for buf in bufs {
530                    // only copy what the window will allow
531                    let to_copy = buf.len().min(len - data.len());
532                    data.extend_from_slice(&buf[..to_copy]);
533
534                    // we're done filling the buffer
535                    if data.len() == len {
536                        break;
537                    }
538                }
539
540                self.send_data(data.freeze())?;
541
542                Ok(len).into()
543            }
544
545            #[inline]
546            fn is_write_vectored(&self) -> bool {
547                true
548            }
549
550            #[inline]
551            fn poll_flush(
552                self: core::pin::Pin<&mut Self>,
553                _cx: &mut core::task::Context<'_>,
554            ) -> core::task::Poll<std::io::Result<()>> {
555                // no-op - this contract relies on flushing intermediate buffers, not waiting for
556                // the peer to ACK data
557                core::task::Poll::Ready(Ok(()))
558            }
559
560            #[inline]
561            fn poll_shutdown(
562                mut self: core::pin::Pin<&mut Self>,
563                cx: &mut core::task::Context<'_>,
564            ) -> core::task::Poll<std::io::Result<()>> {
565                core::task::ready!(self.poll_close(cx))?;
566                Ok(()).into()
567            }
568        }
569    };
570}
571
572impl SendStream {
573    #[inline]
574    pub(crate) const fn new(stream: stream::SendStream) -> Self {
575        Self(stream)
576    }
577
578    /// Returns the stream's identifier
579    ///
580    /// This value is unique to a particular connection. The format follows the same as what is
581    /// defined in the
582    /// [QUIC Transport RFC](https://www.rfc-editor.org/rfc/rfc9000.html#name-stream-types-and-identifier).
583    ///
584    /// # Examples
585    ///
586    /// ```rust,no_run
587    /// # async fn test() -> s2n_quic::stream::Result<()> {
588    /// #   let connection: s2n_quic::connection::Connection = todo!();
589    /// #
590    /// let stream = connection.open_send_stream().await?;
591    /// println!("New stream's id: {}", stream.id());
592    /// #
593    /// #   Ok(())
594    /// # }
595    /// ```
596    #[inline]
597    pub fn id(&self) -> u64 {
598        self.0.id().into()
599    }
600
601    impl_connection_api!(|stream| crate::connection::Handle(stream.0.connection().clone()));
602
603    impl_send_stream_api!(|stream, dispatch| dispatch!(stream.0));
604}
605
606impl_splittable_stream_trait!(SendStream, |stream| (None, Some(stream)));
607impl_send_stream_trait!(SendStream, |stream, dispatch| dispatch!(stream.0));