s2n_quic_core/stream/
ops.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Bulk operations performed on streams
5//!
6//! By representing stream operations as structs, callers can request multiple tasks to be
7//! performed in a single call, which reduces context switching.
8//!
9//! Consider the following scenario where we send 3 chunks of data and finish the stream:
10//!
11//! ```rust,ignore
12//! stream.send(a).await?;
13//! stream.send(b).await?;
14//! stream.send(c).await?;
15//! stream.finish().await?;
16//! ```
17//!
18//! This will result in at least 4 context switches (and potentially even more if the stream
19//! is currently blocked on sending).
20//!
21//! Using the bulk operation API greatly reduces this amount:
22//!
23//! ```rust,ignore
24//! stream
25//!     .request()
26//!     .send(&mut [a, b, c])
27//!     .finish()
28//!     .await?;
29//! ```
30
31use crate::{application, stream};
32use core::task::Poll;
33
34/// A request made on a stream
35#[derive(Default, Debug)]
36pub struct Request<'a> {
37    /// The `tx` options of the request
38    pub tx: Option<tx::Request<'a>>,
39
40    /// The `rx` options of the request
41    pub rx: Option<rx::Request<'a>>,
42}
43
44impl<'a> Request<'a> {
45    /// Requests a slice of chunks to be sent on the tx stream
46    pub fn send(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
47        self.tx_mut().chunks = Some(chunks);
48        self
49    }
50
51    /// Resets the tx stream with an error code
52    pub fn reset(&mut self, error: application::Error) -> &mut Self {
53        self.tx_mut().reset = Some(error);
54        self
55    }
56
57    /// Flushes any pending tx data to be ACKed before unblocking
58    pub fn flush(&mut self) -> &mut Self {
59        self.tx_mut().flush = true;
60        self
61    }
62
63    /// Marks the tx stream as finished (e.g. no more data will be sent)
64    pub fn finish(&mut self) -> &mut Self {
65        self.tx_mut().finish = true;
66        self
67    }
68
69    /// Requests data on the rx stream to be received into the provided slice of chunks
70    pub fn receive(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
71        self.rx_mut().chunks = Some(chunks);
72        self
73    }
74
75    /// Requests the peer to stop sending data on the rx stream
76    pub fn stop_sending(&mut self, error: application::Error) -> &mut Self {
77        self.rx_mut().stop_sending = Some(error);
78        self
79    }
80
81    /// Sets the watermarks for the rx stream
82    pub fn with_watermark(&mut self, low: usize, high: usize) -> &mut Self {
83        let rx = self.rx_mut();
84        rx.low_watermark = low.min(high);
85        rx.high_watermark = high.max(low);
86        self
87    }
88
89    /// Sets the low watermark for the rx stream
90    ///
91    /// If the watermark is set to `0`, the caller will be notified as soon as there is data
92    /// available on the stream.
93    ///
94    /// If the watermark is greater than `0`, the caller will be notified as soon as there is at
95    /// least `low` bytes available to be read. Note that the stream may be woken earlier.
96    pub fn with_low_watermark(&mut self, low: usize) -> &mut Self {
97        let rx = self.rx_mut();
98        rx.low_watermark = low;
99        // raise the high watermark to be at least the lower
100        rx.high_watermark = rx.high_watermark.max(low);
101        self
102    }
103
104    /// Sets the high watermark for the rx stream
105    ///
106    /// The stream ensures that all the received data will not exceed the watermark amount. This
107    /// can be useful for receiving at most `n` bytes.
108    pub fn with_high_watermark(&mut self, high: usize) -> &mut Self {
109        let rx = self.rx_mut();
110        rx.high_watermark = high;
111        // lower the low watermark to be less than the higher
112        rx.low_watermark = rx.low_watermark.min(high);
113        self
114    }
115
116    pub fn detach_tx(&mut self) -> &mut Self {
117        let tx = self.tx_mut();
118        tx.detached = true;
119        self
120    }
121
122    pub fn detach_rx(&mut self) -> &mut Self {
123        let rx = self.rx_mut();
124        rx.detached = true;
125        self
126    }
127
128    /// Lazily creates and returns the `tx` request
129    fn tx_mut(&mut self) -> &mut tx::Request<'a> {
130        if self.tx.is_none() {
131            self.tx = Some(Default::default());
132        }
133        self.tx.as_mut().expect("tx should always be initialized")
134    }
135
136    /// Lazily creates and returns the `rx` request
137    fn rx_mut(&mut self) -> &mut rx::Request<'a> {
138        if self.rx.is_none() {
139            self.rx = Some(Default::default());
140        }
141        self.rx.as_mut().expect("rx should always be initialized")
142    }
143}
144
145/// A response received after executing a request
146#[derive(Debug, Default, PartialEq, Eq)]
147pub struct Response {
148    /// The `tx` information of the response
149    pub tx: Option<tx::Response>,
150
151    /// The `rx` information of the response
152    pub rx: Option<rx::Response>,
153}
154
155impl Response {
156    /// Returns `true` if either the `rx` or `tx` requests will wake the provided waker at a later
157    /// point in time.
158    pub fn is_pending(&self) -> bool {
159        self.tx.iter().any(|tx| tx.is_pending()) || self.rx.iter().any(|rx| rx.is_pending())
160    }
161
162    /// Returns the `tx` response
163    pub fn tx(&self) -> Option<&tx::Response> {
164        self.tx.as_ref()
165    }
166
167    /// Returns the `rx` response
168    pub fn rx(&self) -> Option<&rx::Response> {
169        self.rx.as_ref()
170    }
171}
172
173/// Request and response related to transmitting on a stream
174pub mod tx {
175    use super::*;
176
177    /// A request on a `tx` stream
178    #[derive(Default, Debug)]
179    pub struct Request<'a> {
180        /// Optionally transmit chunks onto the stream
181        ///
182        /// The chunks will be replaced with empty buffers as they are stored in the transmission
183        /// buffer. The response will indicate how many chunks and bytes were consumed from
184        /// this slice.
185        pub chunks: Option<&'a mut [bytes::Bytes]>,
186
187        /// Optionally reset the stream with an error
188        pub reset: Option<application::Error>,
189
190        /// Waits for an ACK on resets and finishes
191        pub flush: bool,
192
193        /// Marks the tx stream as finished (e.g. no more data will be sent)
194        pub finish: bool,
195
196        /// Marks the tx stream as detached, which makes the stream make progress, regardless of
197        /// application observations.
198        pub detached: bool,
199    }
200
201    /// The result of a tx request
202    #[derive(Debug, PartialEq, Eq)]
203    pub struct Response {
204        /// Information about the bytes that were sent
205        pub bytes: Bytes,
206
207        /// Information about the chunks that were sent
208        pub chunks: Chunks,
209
210        /// Indicates if the operation resulted in storing the provided waker to notify when the
211        /// request may be polled again.
212        pub will_wake: bool,
213
214        /// The current status of the stream
215        pub status: Status,
216    }
217
218    impl Default for Response {
219        fn default() -> Self {
220            Self {
221                bytes: Bytes::default(),
222                chunks: Chunks::default(),
223                will_wake: false,
224                status: Status::Open,
225            }
226        }
227    }
228
229    impl Response {
230        /// Returns true if provided waker will be woken
231        pub fn is_pending(&self) -> bool {
232            self.will_wake
233        }
234
235        /// Returns the `tx` response
236        pub fn tx(&self) -> Option<&Self> {
237            Some(self)
238        }
239    }
240}
241
242/// Request and response related to receiving on a stream
243pub mod rx {
244    use super::*;
245
246    /// A request on a `rx` stream
247    #[derive(Debug)]
248    pub struct Request<'a> {
249        /// Optionally receive chunks from the stream
250        ///
251        /// At least one of the provided chunks should be empty, as it will be replaced by the
252        /// received data from the stream. The response will indicate how many chunks and
253        /// bytes were consumed from the stream into the provided slice.
254        pub chunks: Option<&'a mut [bytes::Bytes]>,
255
256        /// Sets the low watermark for the rx stream
257        ///
258        /// If the watermark is set to `0`, the caller will be notified as soon as there is data
259        /// available on the stream.
260        ///
261        /// If the watermark is greater than `0`, the caller will be notified as soon as there is at
262        /// least `low` bytes available to be read. Note that the stream may be woken earlier.
263        pub low_watermark: usize,
264
265        /// Sets the high watermark for the rx stream
266        ///
267        /// The stream ensures that all the received data will not exceed the watermark amount. This
268        /// can be useful for receiving at most `n` bytes.
269        pub high_watermark: usize,
270
271        /// Optionally requests the peer to stop sending data with an error
272        pub stop_sending: Option<application::Error>,
273
274        /// Marks the rx stream as detached, which makes the stream make progress, regardless of
275        /// application observations.
276        pub detached: bool,
277    }
278
279    impl Default for Request<'_> {
280        fn default() -> Self {
281            Self {
282                chunks: None,
283                low_watermark: 0,
284                high_watermark: usize::MAX,
285                stop_sending: None,
286                detached: false,
287            }
288        }
289    }
290
291    /// The result of a pop operation
292    #[derive(Debug, PartialEq, Eq)]
293    pub struct Response {
294        /// Information about the bytes that were received
295        pub bytes: Bytes,
296
297        /// Information about the chunks that were received
298        pub chunks: Chunks,
299
300        /// Indicates if the operation resulted in storing the provided waker to notify when the
301        /// request may be polled again.
302        pub will_wake: bool,
303
304        /// The current status of the stream
305        pub status: Status,
306    }
307
308    impl Default for Response {
309        fn default() -> Self {
310            Self {
311                bytes: Bytes::default(),
312                chunks: Chunks::default(),
313                will_wake: false,
314                status: Status::Open,
315            }
316        }
317    }
318
319    impl Response {
320        /// Returns true if provided waker will be woken
321        pub fn is_pending(&self) -> bool {
322            self.will_wake
323        }
324
325        /// Returns the `rx` response
326        pub fn rx(&self) -> Option<&Self> {
327            Some(self)
328        }
329    }
330}
331
332#[derive(Debug, Default, PartialEq, Eq)]
333pub struct Bytes {
334    /// The number of bytes that were consumed by the operation.
335    ///
336    /// In the case of `tx` operations, this is the number of bytes that were sent on the
337    /// stream.
338    ///
339    /// In the case of `rx` operations, this is the number of bytes that were received from the
340    /// stream.
341    pub consumed: usize,
342
343    /// The number of bytes that are available on the stream.
344    ///
345    /// In the case of `tx` operations, this is the number of additional bytes that can be sent
346    /// in the stream. Note that this is not a hard limit on accepting a chunk of data.
347    ///
348    /// In the case of `rx` operations, this is the number of additional bytes that can be received
349    /// from the stream.
350    pub available: usize,
351}
352
353#[derive(Debug, Default, PartialEq, Eq)]
354pub struct Chunks {
355    /// The number of chunks that were consumed by the operation.
356    ///
357    /// In the case of `tx` operations, this is the number of chunks that were sent on the
358    /// stream.
359    ///
360    /// In the case of `rx` operations, this is the number of chunks that were received from the
361    /// stream.
362    pub consumed: usize,
363
364    /// The number of chunks that are available on the stream.
365    ///
366    /// In the case of `tx` operations, this is the number of additional chunks that can be sent
367    /// in the stream. This value will be based on the assumption of 1 byte chunks and will
368    /// contain the same value of `bytes.available`.
369    ///
370    /// In the case of `rx` operations, this is the number of additional chunks that can be received
371    /// from the stream.
372    pub available: usize,
373}
374
375#[derive(Copy, Clone, Debug, PartialEq, Eq)]
376pub enum Status {
377    /// The stream is open and writable
378    Open,
379
380    /// The stream is finishing but still has data to be flushed
381    Finishing,
382
383    /// The stream is finished and completely flushed
384    Finished,
385
386    /// The stream has been reset locally but has not been acknowledged by the peer
387    Resetting,
388
389    /// The stream was reset either by the peer or locally
390    Reset(stream::StreamError),
391}
392
393macro_rules! impl_status {
394    (| $self:ident | $value:expr) => {
395        /// Returns `true` if the status is `Open`
396        pub fn is_open(&self) -> bool {
397            matches!(self.status(), Status::Open)
398        }
399
400        /// Returns `true` if the status is `Finishing`
401        pub fn is_finishing(&self) -> bool {
402            matches!(self.status(), Status::Finishing)
403        }
404
405        /// Returns `true` if the status is `Finished`
406        pub fn is_finished(&self) -> bool {
407            matches!(self.status(), Status::Finished)
408        }
409
410        /// Returns `true` if the status is `Resetting`
411        pub fn is_resetting(&self) -> bool {
412            matches!(self.status(), Status::Resetting)
413        }
414
415        /// Returns `true` if the status is `Reset`
416        pub fn is_reset(&self) -> bool {
417            matches!(self.status(), Status::Reset(_))
418        }
419
420        /// Returns `true` if the status is `Finishing` or `Resetting`
421        pub fn is_closing(&self) -> bool {
422            self.is_finishing() || self.is_resetting()
423        }
424
425        /// Returns `true` if the status is `Finished` or `Reset`
426        pub fn is_closed(&self) -> bool {
427            self.is_finished() || self.is_reset()
428        }
429
430        const fn status(&$self) -> Status {
431            $value
432        }
433    };
434}
435
436impl Status {
437    impl_status!(|self| *self);
438}
439
440impl rx::Response {
441    impl_status!(|self| self.status);
442}
443
444impl tx::Response {
445    impl_status!(|self| self.status);
446}
447
448macro_rules! conversions {
449    ($name:path) => {
450        impl $name {
451            /// Converts the response into a `Poll<Self>`
452            pub fn into_poll(self) -> Poll<Self> {
453                if self.is_pending() {
454                    Poll::Pending
455                } else {
456                    Poll::Ready(self)
457                }
458            }
459        }
460
461        impl From<$name> for () {
462            fn from(_: $name) {}
463        }
464
465        impl<T, E> From<$name> for Poll<Result<T, E>>
466        where
467            $name: Into<T>,
468        {
469            fn from(v: $name) -> Poll<Result<T, E>> {
470                if v.is_pending() {
471                    Poll::Pending
472                } else {
473                    Poll::Ready(Ok(v.into()))
474                }
475            }
476        }
477    };
478}
479
480conversions!(Response);
481conversions!(tx::Response);
482conversions!(rx::Response);
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    #[test]
489    fn request_builder_test() {
490        let mut request = Request::default();
491        let mut send_chunks = [bytes::Bytes::from_static(&[1])];
492        let mut receive_chunks = [
493            bytes::Bytes::from_static(&[2]),
494            bytes::Bytes::from_static(&[3]),
495        ];
496
497        request
498            .send(&mut send_chunks)
499            .finish()
500            .flush()
501            .reset(application::Error::new(1).unwrap())
502            .receive(&mut receive_chunks)
503            .with_watermark(5, 10)
504            .stop_sending(application::Error::new(2).unwrap());
505
506        assert!(matches!(
507            request,
508            Request {
509                tx: Some(tx::Request {
510                    chunks: Some(tx_chunks),
511                    finish: true,
512                    flush: true,
513                    reset: Some(reset),
514                    detached: false,
515                }),
516                rx: Some(rx::Request {
517                    chunks: Some(rx_chunks),
518                    low_watermark: 5,
519                    high_watermark: 10,
520                    stop_sending: Some(stop_sending),
521                    detached: false,
522                })
523            } if reset == application::Error::new(1).unwrap()
524              && stop_sending == application::Error::new(2).unwrap()
525              && tx_chunks.len() == 1
526              && rx_chunks.len() == 2
527        ));
528    }
529
530    #[test]
531    fn response_pending_test() {
532        for rx_pending in [false, true] {
533            for tx_pending in [false, true] {
534                let response = Response {
535                    tx: Some(tx::Response {
536                        will_wake: tx_pending,
537                        ..Default::default()
538                    }),
539                    rx: Some(rx::Response {
540                        will_wake: rx_pending,
541                        ..Default::default()
542                    }),
543                };
544
545                assert_eq!(response.is_pending(), rx_pending || tx_pending);
546
547                if rx_pending || tx_pending {
548                    assert_eq!(response.into_poll(), Poll::Pending);
549                } else {
550                    assert_eq!(
551                        response.into_poll(),
552                        Poll::Ready(Response {
553                            tx: Some(tx::Response::default()),
554                            rx: Some(rx::Response::default()),
555                        })
556                    );
557                }
558            }
559        }
560    }
561}