s2n_quic/stream/send.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use s2n_quic_transport::stream;
5
6/// A QUIC stream that is only allowed to send data.
7#[derive(Debug)]
8pub struct SendStream(stream::SendStream);
9
10macro_rules! impl_send_stream_api {
11 (| $stream:ident, $dispatch:ident | $dispatch_body:expr) => {
12 /// Enqueues a chunk of data for sending it towards the peer.
13 ///
14 /// # Return value
15 ///
16 /// The function returns:
17 ///
18 /// - `Ok(())` if the data was enqueued for sending.
19 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
20 ///
21 /// # Examples
22 ///
23 /// ```rust,no_run
24 /// # async fn test() -> s2n_quic::stream::Result<()> {
25 /// # let stream: s2n_quic::stream::SendStream = todo!();
26 /// #
27 /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
28 /// stream.send(data).await?;
29 /// #
30 /// # Ok(())
31 /// # }
32 /// ```
33 #[inline]
34 pub async fn send(&mut self, mut data: bytes::Bytes) -> $crate::stream::Result<()> {
35 ::futures::future::poll_fn(|cx| self.poll_send(&mut data, cx)).await
36 }
37
38 /// Enqueues a chunk of data for sending it towards the peer.
39 ///
40 /// # Return value
41 ///
42 /// The function returns:
43 ///
44 /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
45 /// the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
46 /// [`Context`](core::task::Context) is notified.
47 /// - `Poll::Ready(Ok(()))` if the data was enqueued for sending. The provided `chunk` will
48 /// be replaced with an empty [`Bytes`](bytes::Bytes).
49 /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
50 #[inline]
51 pub fn poll_send(
52 &mut self,
53 chunk: &mut bytes::Bytes,
54 cx: &mut core::task::Context,
55 ) -> core::task::Poll<$crate::stream::Result<()>> {
56 macro_rules! $dispatch {
57 () => {
58 Err($crate::stream::Error::non_writable()).into()
59 };
60 ($variant: expr) => {
61 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
62 $variant.poll_send(chunk, cx)
63 })
64 };
65 }
66
67 let $stream = self;
68 $dispatch_body
69 }
70
71 /// Enqueues a slice of chunks of data for sending it towards the peer.
72 ///
73 /// # Return value
74 ///
75 /// The function returns:
76 ///
77 /// - `Ok(())` if all of the chunks of data were enqueued for sending. Each of the
78 /// consumed [`Bytes`](bytes::Bytes) will be replaced with an empty [`Bytes`](bytes::Bytes).
79 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
80 ///
81 /// # Examples
82 ///
83 /// ```rust,no_run
84 /// # async fn test() -> s2n_quic::stream::Result<()> {
85 /// # let stream: s2n_quic::stream::SendStream = todo!();
86 /// #
87 /// let mut data1 = bytes::Bytes::from_static(&[1, 2, 3]);
88 /// let mut data2 = bytes::Bytes::from_static(&[4, 5, 6]);
89 /// let mut data3 = bytes::Bytes::from_static(&[7, 8, 9]);
90 /// let chunks = [data1, data2, data3];
91 /// stream.send_vectored(&mut chunks).await?;
92 /// #
93 /// # Ok(())
94 /// # }
95 /// ```
96 #[inline]
97 pub async fn send_vectored(
98 &mut self,
99 chunks: &mut [bytes::Bytes],
100 ) -> $crate::stream::Result<()> {
101 let mut sent_chunks = 0;
102
103 ::futures::future::poll_fn(|cx| {
104 sent_chunks +=
105 ::core::task::ready!(self.poll_send_vectored(&mut chunks[sent_chunks..], cx))?;
106 if sent_chunks == chunks.len() {
107 return Ok(()).into();
108 }
109 core::task::Poll::Pending
110 })
111 .await
112 }
113
114 /// Polls enqueueing a slice of chunks of data for sending it towards the peer.
115 ///
116 /// # Return value
117 ///
118 /// The function returns:
119 ///
120 /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
121 /// the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
122 /// [`Context`](core::task::Context) is notified.
123 /// - `Poll::Ready(Ok(count))` if one or more chunks of data were enqueued for sending. Any of the
124 /// consumed [`Bytes`](bytes::Bytes) will be replaced with an empty [`Bytes`](bytes::Bytes).
125 /// If `count` does not equal the total number of chunks, the stream will store the
126 /// [Waker](core::task::Waker) and notify the task once more capacity is available.
127 /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
128 #[inline]
129 pub fn poll_send_vectored(
130 &mut self,
131 chunks: &mut [bytes::Bytes],
132 cx: &mut core::task::Context,
133 ) -> core::task::Poll<$crate::stream::Result<usize>> {
134 macro_rules! $dispatch {
135 () => {
136 Err($crate::stream::Error::non_writable()).into()
137 };
138 ($variant: expr) => {
139 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
140 $variant.poll_send_vectored(chunks, cx)
141 })
142 };
143 }
144
145 let $stream = self;
146 $dispatch_body
147 }
148
149 /// Polls send readiness for the given stream.
150 ///
151 /// This method _must_ be called before calling [`send_data`](Self::send_data).
152 ///
153 /// # Return value
154 ///
155 /// The function returns:
156 /// - `Poll::Pending` if the stream's send buffer capacity is currently exhausted. In this case,
157 /// the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
158 /// [`Context`](core::task::Context) is notified.
159 /// - `Poll::Ready(Ok(available_bytes))` if the stream is ready to send data, where
160 /// `available_bytes` is how many bytes the stream can currently accept.
161 /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
162 #[inline]
163 pub fn poll_send_ready(
164 &mut self,
165 cx: &mut core::task::Context,
166 ) -> core::task::Poll<$crate::stream::Result<usize>> {
167 macro_rules! $dispatch {
168 () => {
169 Err($crate::stream::Error::non_writable()).into()
170 };
171 ($variant: expr) => {
172 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
173 $variant.poll_send_ready(cx)
174 })
175 };
176 }
177
178 let $stream = self;
179 $dispatch_body
180 }
181
182 /// Sends data on the stream without blocking the task.
183 ///
184 /// [`poll_send_ready`](Self::poll_send_ready) _must_ be called before calling this method.
185 ///
186 /// # Return value
187 ///
188 /// The function returns:
189 /// - `Ok(())` if the data was enqueued for sending.
190 /// - `Err(SendingBlocked)` if the stream did not have enough capacity to enqueue the
191 /// `chunk`.
192 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
193 #[inline]
194 pub fn send_data(&mut self, chunk: bytes::Bytes) -> $crate::stream::Result<()> {
195 macro_rules! $dispatch {
196 () => {
197 Err($crate::stream::Error::non_writable())
198 };
199 ($variant: expr) => {
200 $variant.send_data(chunk)
201 };
202 }
203
204 let $stream = self;
205 $dispatch_body
206 }
207
208 /// Flushes the stream and waits for the peer to receive all outstanding data.
209 ///
210 /// # Return value
211 ///
212 /// The function returns:
213 /// - `Ok(())` if the send buffer was completely flushed and acknowledged by
214 /// the peer.
215 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
216 ///
217 /// # Note
218 ///
219 /// Using this function can potentially introduce additional latency, as it will only
220 /// return after all outstanding data is acknowledged by the peer. For this reason, the
221 /// `AsyncWrite` trait implementations do not use this functionality and are merely no-ops.
222 ///
223 /// # Examples
224 ///
225 /// ```rust,no_run
226 /// # async fn test() -> s2n_quic::stream::Result<()> {
227 /// # let stream: s2n_quic::stream::SendStream = todo!();
228 /// #
229 /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
230 /// stream.send(data).await?;
231 /// stream.flush().await?;
232 /// // at this point, the peer has received all of the `data`
233 /// #
234 /// # Ok(())
235 /// # }
236 /// ```
237 #[inline]
238 pub async fn flush(&mut self) -> $crate::stream::Result<()> {
239 ::futures::future::poll_fn(|cx| self.poll_flush(cx)).await
240 }
241
242 /// Polls flushing the stream and waits for the peer to receive all outstanding data.
243 ///
244 /// # Return value
245 ///
246 /// The function returns:
247 /// - `Poll::Pending` if the stream's send buffer is still being sent. In this case,
248 /// the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
249 /// [`Context`](core::task::Context) is notified.
250 /// - `Poll::Ready(Ok(()))` if the send buffer was completely flushed and acknowledged by
251 /// the peer.
252 /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
253 #[inline]
254 pub fn poll_flush(
255 &mut self,
256 cx: &mut core::task::Context,
257 ) -> core::task::Poll<$crate::stream::Result<()>> {
258 macro_rules! $dispatch {
259 () => {
260 Err($crate::stream::Error::non_writable()).into()
261 };
262 ($variant: expr) => {
263 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
264 $variant.poll_flush(cx)
265 })
266 };
267 }
268
269 let $stream = self;
270 $dispatch_body
271 }
272
273 /// Marks the stream as finished.
274 ///
275 /// This method returns immediately without notifying the caller that all of the outstanding
276 /// data has been received by the peer. An application wanting to both [`finish`](Self::finish)
277 /// and [`flush`](Self::flush) the outstanding data can use [`close`](Self::close) to accomplish
278 /// this.
279 ///
280 /// __NOTE__: This method will be called when the [`stream`](Self) is dropped.
281 ///
282 /// # Return value
283 ///
284 /// The function returns:
285 /// - `Ok(())` if the stream was finished successfully.
286 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
287 #[inline]
288 pub fn finish(&mut self) -> $crate::stream::Result<()> {
289 macro_rules! $dispatch {
290 () => {
291 Err($crate::stream::Error::non_writable()).into()
292 };
293 ($variant: expr) => {
294 $variant.finish()
295 };
296 }
297
298 let $stream = self;
299 $dispatch_body
300 }
301
302 /// Marks the stream as finished and waits for all outstanding data to be acknowledged.
303 ///
304 /// This method is equivalent to calling [`finish`](Self::finish) and [`flush`](Self::flush).
305 ///
306 /// # Return value
307 ///
308 /// The function returns:
309 /// - `Ok(())` if the send buffer was completely flushed and acknowledged by
310 /// the peer.
311 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error).
312 ///
313 /// # Examples
314 ///
315 /// ```rust,no_run
316 /// # async fn test() -> s2n_quic::stream::Result<()> {
317 /// # let stream: s2n_quic::stream::SendStream = todo!();
318 /// #
319 /// let data = bytes::Bytes::from_static(&[1, 2, 3, 4]);
320 /// stream.send(data).await?;
321 /// stream.close().await?;
322 /// // at this point, the peer has received all of the `data` and has acknowledged the
323 /// // stream being finished.
324 /// #
325 /// # Ok(())
326 /// # }
327 /// ```
328 #[inline]
329 pub async fn close(&mut self) -> $crate::stream::Result<()> {
330 ::futures::future::poll_fn(|cx| self.poll_close(cx)).await
331 }
332
333 /// Marks the stream as finished and polls for all outstanding data to be acknowledged.
334 ///
335 /// This method is equivalent to calling [`finish`](Self::finish) and [`flush`](Self::flush).
336 ///
337 /// # Return value
338 ///
339 /// The function returns:
340 /// - `Poll::Pending` if the stream's send buffer is still being sent. In this case,
341 /// the caller should retry sending after the [`Waker`](core::task::Waker) on the provided
342 /// [`Context`](core::task::Context) is notified.
343 /// - `Poll::Ready(Ok(()))` if the send buffer was completely flushed and acknowledged by
344 /// the peer.
345 /// - `Poll::Ready(Err(e))` if the stream encountered a [`stream::Error`](crate::stream::Error).
346 #[inline]
347 pub fn poll_close(
348 &mut self,
349 cx: &mut core::task::Context,
350 ) -> core::task::Poll<$crate::stream::Result<()>> {
351 macro_rules! $dispatch {
352 () => {
353 Err($crate::stream::Error::non_writable()).into()
354 };
355 ($variant: expr) => {
356 s2n_quic_core::task::waker::debug_assert_contract(cx, |cx| {
357 $variant.poll_close(cx)
358 })
359 };
360 }
361
362 let $stream = self;
363 $dispatch_body
364 }
365
366 /// Closes the stream with an [error code](crate::application::Error).
367 ///
368 /// After calling this, the stream is closed and will not accept any additional data to be
369 /// sent to the peer. The peer will also be notified of the [error
370 /// code](crate::application::Error).
371 ///
372 /// # Return value
373 ///
374 /// The function returns:
375 /// - `Ok(())` if the stream was reset successfully.
376 /// - `Err(e)` if the stream encountered a [`stream::Error`](crate::stream::Error). The
377 /// stream may have been reset previously, or the connection itself was closed.
378 #[inline]
379 pub fn reset(
380 &mut self,
381 error_code: $crate::application::Error,
382 ) -> $crate::stream::Result<()> {
383 macro_rules! $dispatch {
384 () => {
385 Err($crate::stream::Error::non_writable())
386 };
387 ($variant: expr) => {
388 $variant.reset(error_code)
389 };
390 }
391
392 let $stream = self;
393 $dispatch_body
394 }
395 };
396}
397
398macro_rules! impl_send_stream_trait {
399 ($name:ident, | $stream:ident, $dispatch:ident | $dispatch_body:expr) => {
400 impl futures::sink::Sink<bytes::Bytes> for $name {
401 type Error = $crate::stream::Error;
402
403 #[inline]
404 fn poll_ready(
405 mut self: core::pin::Pin<&mut Self>,
406 cx: &mut core::task::Context<'_>,
407 ) -> core::task::Poll<$crate::stream::Result<()>> {
408 core::task::ready!(self.poll_send_ready(cx))?;
409 Ok(()).into()
410 }
411
412 #[inline]
413 fn start_send(
414 mut self: core::pin::Pin<&mut Self>,
415 data: bytes::Bytes,
416 ) -> $crate::stream::Result<()> {
417 self.send_data(data)
418 }
419
420 #[inline]
421 fn poll_flush(
422 self: core::pin::Pin<&mut Self>,
423 _cx: &mut core::task::Context<'_>,
424 ) -> core::task::Poll<$crate::stream::Result<()>> {
425 // no-op - this contract relies on flushing intermediate buffers, not waiting for
426 // the peer to ACK data
427 core::task::Poll::Ready(Ok(()))
428 }
429
430 #[inline]
431 fn poll_close(
432 mut self: core::pin::Pin<&mut Self>,
433 cx: &mut core::task::Context<'_>,
434 ) -> core::task::Poll<$crate::stream::Result<()>> {
435 Self::poll_close(&mut self, cx)
436 }
437 }
438
439 impl futures::io::AsyncWrite for $name {
440 fn poll_write(
441 mut self: core::pin::Pin<&mut Self>,
442 cx: &mut core::task::Context<'_>,
443 buf: &[u8],
444 ) -> core::task::Poll<std::io::Result<usize>> {
445 if buf.is_empty() {
446 return Ok(0).into();
447 }
448
449 let len = core::task::ready!(self.poll_send_ready(cx))?.min(buf.len());
450 let data = bytes::Bytes::copy_from_slice(&buf[..len]);
451 self.send_data(data)?;
452 Ok(len).into()
453 }
454
455 fn poll_write_vectored(
456 mut self: core::pin::Pin<&mut Self>,
457 cx: &mut core::task::Context<'_>,
458 bufs: &[futures::io::IoSlice],
459 ) -> core::task::Poll<std::io::Result<usize>> {
460 if bufs.is_empty() {
461 return Ok(0).into();
462 }
463
464 let len = core::task::ready!(self.poll_send_ready(cx))?;
465 let capacity = bufs.iter().map(|buf| buf.len()).sum();
466 let len = len.min(capacity);
467
468 let mut data = bytes::BytesMut::with_capacity(len);
469 for buf in bufs {
470 // only copy what the window will allow
471 let to_copy = buf.len().min(len - data.len());
472 data.extend_from_slice(&buf[..to_copy]);
473
474 // we're done filling the buffer
475 if data.len() == len {
476 break;
477 }
478 }
479
480 self.send_data(data.freeze())?;
481
482 Ok(len).into()
483 }
484
485 #[inline]
486 fn poll_flush(
487 self: core::pin::Pin<&mut Self>,
488 _cx: &mut core::task::Context<'_>,
489 ) -> core::task::Poll<std::io::Result<()>> {
490 // no-op - this contract relies on flushing intermediate buffers, not waiting for
491 // the peer to ACK data
492 core::task::Poll::Ready(Ok(()))
493 }
494
495 #[inline]
496 fn poll_close(
497 mut self: core::pin::Pin<&mut Self>,
498 cx: &mut core::task::Context<'_>,
499 ) -> core::task::Poll<std::io::Result<()>> {
500 core::task::ready!($name::poll_close(&mut self, cx))?;
501 Ok(()).into()
502 }
503 }
504
505 impl tokio::io::AsyncWrite for $name {
506 #[inline]
507 fn poll_write(
508 self: core::pin::Pin<&mut Self>,
509 cx: &mut core::task::Context<'_>,
510 buf: &[u8],
511 ) -> core::task::Poll<std::io::Result<usize>> {
512 futures::io::AsyncWrite::poll_write(self, cx, buf)
513 }
514
515 fn poll_write_vectored(
516 mut self: core::pin::Pin<&mut Self>,
517 cx: &mut core::task::Context<'_>,
518 bufs: &[std::io::IoSlice],
519 ) -> core::task::Poll<std::io::Result<usize>> {
520 if bufs.is_empty() {
521 return Ok(0).into();
522 }
523
524 let len = core::task::ready!(self.poll_send_ready(cx))?;
525 let capacity = bufs.iter().map(|buf| buf.len()).sum();
526 let len = len.min(capacity);
527
528 let mut data = bytes::BytesMut::with_capacity(len);
529 for buf in bufs {
530 // only copy what the window will allow
531 let to_copy = buf.len().min(len - data.len());
532 data.extend_from_slice(&buf[..to_copy]);
533
534 // we're done filling the buffer
535 if data.len() == len {
536 break;
537 }
538 }
539
540 self.send_data(data.freeze())?;
541
542 Ok(len).into()
543 }
544
545 #[inline]
546 fn is_write_vectored(&self) -> bool {
547 true
548 }
549
550 #[inline]
551 fn poll_flush(
552 self: core::pin::Pin<&mut Self>,
553 _cx: &mut core::task::Context<'_>,
554 ) -> core::task::Poll<std::io::Result<()>> {
555 // no-op - this contract relies on flushing intermediate buffers, not waiting for
556 // the peer to ACK data
557 core::task::Poll::Ready(Ok(()))
558 }
559
560 #[inline]
561 fn poll_shutdown(
562 mut self: core::pin::Pin<&mut Self>,
563 cx: &mut core::task::Context<'_>,
564 ) -> core::task::Poll<std::io::Result<()>> {
565 core::task::ready!(self.poll_close(cx))?;
566 Ok(()).into()
567 }
568 }
569 };
570}
571
572impl SendStream {
573 #[inline]
574 pub(crate) const fn new(stream: stream::SendStream) -> Self {
575 Self(stream)
576 }
577
578 /// Returns the stream's identifier
579 ///
580 /// This value is unique to a particular connection. The format follows the same as what is
581 /// defined in the
582 /// [QUIC Transport RFC](https://www.rfc-editor.org/rfc/rfc9000.html#name-stream-types-and-identifier).
583 ///
584 /// # Examples
585 ///
586 /// ```rust,no_run
587 /// # async fn test() -> s2n_quic::stream::Result<()> {
588 /// # let connection: s2n_quic::connection::Connection = todo!();
589 /// #
590 /// let stream = connection.open_send_stream().await?;
591 /// println!("New stream's id: {}", stream.id());
592 /// #
593 /// # Ok(())
594 /// # }
595 /// ```
596 #[inline]
597 pub fn id(&self) -> u64 {
598 self.0.id().into()
599 }
600
601 impl_connection_api!(|stream| crate::connection::Handle(stream.0.connection().clone()));
602
603 impl_send_stream_api!(|stream, dispatch| dispatch!(stream.0));
604}
605
606impl_splittable_stream_trait!(SendStream, |stream| (None, Some(stream)));
607impl_send_stream_trait!(SendStream, |stream, dispatch| dispatch!(stream.0));