s2n_quic_core/stream/ops.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Bulk operations performed on streams
5//!
6//! By representing stream operations as structs, callers can request multiple tasks to be
7//! performed in a single call, which reduces context switching.
8//!
9//! Consider the following scenario where we send 3 chunks of data and finish the stream:
10//!
11//! ```rust,ignore
12//! stream.send(a).await?;
13//! stream.send(b).await?;
14//! stream.send(c).await?;
15//! stream.finish().await?;
16//! ```
17//!
18//! This will result in at least 4 context switches (and potentially even more if the stream
19//! is currently blocked on sending).
20//!
21//! Using the bulk operation API greatly reduces this amount:
22//!
23//! ```rust,ignore
24//! stream
25//! .request()
26//! .send(&mut [a, b, c])
27//! .finish()
28//! .await?;
29//! ```
30
31use crate::{application, stream};
32use core::task::Poll;
33
34/// A request made on a stream
35#[derive(Default, Debug)]
36pub struct Request<'a> {
37 /// The `tx` options of the request
38 pub tx: Option<tx::Request<'a>>,
39
40 /// The `rx` options of the request
41 pub rx: Option<rx::Request<'a>>,
42}
43
44impl<'a> Request<'a> {
45 /// Requests a slice of chunks to be sent on the tx stream
46 pub fn send(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
47 self.tx_mut().chunks = Some(chunks);
48 self
49 }
50
51 /// Resets the tx stream with an error code
52 pub fn reset(&mut self, error: application::Error) -> &mut Self {
53 self.tx_mut().reset = Some(error);
54 self
55 }
56
57 /// Flushes any pending tx data to be ACKed before unblocking
58 pub fn flush(&mut self) -> &mut Self {
59 self.tx_mut().flush = true;
60 self
61 }
62
63 /// Marks the tx stream as finished (e.g. no more data will be sent)
64 pub fn finish(&mut self) -> &mut Self {
65 self.tx_mut().finish = true;
66 self
67 }
68
69 /// Requests data on the rx stream to be received into the provided slice of chunks
70 pub fn receive(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
71 self.rx_mut().chunks = Some(chunks);
72 self
73 }
74
75 /// Requests the peer to stop sending data on the rx stream
76 pub fn stop_sending(&mut self, error: application::Error) -> &mut Self {
77 self.rx_mut().stop_sending = Some(error);
78 self
79 }
80
81 /// Sets the watermarks for the rx stream
82 pub fn with_watermark(&mut self, low: usize, high: usize) -> &mut Self {
83 let rx = self.rx_mut();
84 rx.low_watermark = low.min(high);
85 rx.high_watermark = high.max(low);
86 self
87 }
88
89 /// Sets the low watermark for the rx stream
90 ///
91 /// If the watermark is set to `0`, the caller will be notified as soon as there is data
92 /// available on the stream.
93 ///
94 /// If the watermark is greater than `0`, the caller will be notified as soon as there is at
95 /// least `low` bytes available to be read. Note that the stream may be woken earlier.
96 pub fn with_low_watermark(&mut self, low: usize) -> &mut Self {
97 let rx = self.rx_mut();
98 rx.low_watermark = low;
99 // raise the high watermark to be at least the lower
100 rx.high_watermark = rx.high_watermark.max(low);
101 self
102 }
103
104 /// Sets the high watermark for the rx stream
105 ///
106 /// The stream ensures that all the received data will not exceed the watermark amount. This
107 /// can be useful for receiving at most `n` bytes.
108 pub fn with_high_watermark(&mut self, high: usize) -> &mut Self {
109 let rx = self.rx_mut();
110 rx.high_watermark = high;
111 // lower the low watermark to be less than the higher
112 rx.low_watermark = rx.low_watermark.min(high);
113 self
114 }
115
116 pub fn detach_tx(&mut self) -> &mut Self {
117 let tx = self.tx_mut();
118 tx.detached = true;
119 self
120 }
121
122 pub fn detach_rx(&mut self) -> &mut Self {
123 let rx = self.rx_mut();
124 rx.detached = true;
125 self
126 }
127
128 /// Lazily creates and returns the `tx` request
129 fn tx_mut(&mut self) -> &mut tx::Request<'a> {
130 if self.tx.is_none() {
131 self.tx = Some(Default::default());
132 }
133 self.tx.as_mut().expect("tx should always be initialized")
134 }
135
136 /// Lazily creates and returns the `rx` request
137 fn rx_mut(&mut self) -> &mut rx::Request<'a> {
138 if self.rx.is_none() {
139 self.rx = Some(Default::default());
140 }
141 self.rx.as_mut().expect("rx should always be initialized")
142 }
143}
144
145/// A response received after executing a request
146#[derive(Debug, Default, PartialEq, Eq)]
147pub struct Response {
148 /// The `tx` information of the response
149 pub tx: Option<tx::Response>,
150
151 /// The `rx` information of the response
152 pub rx: Option<rx::Response>,
153}
154
155impl Response {
156 /// Returns `true` if either the `rx` or `tx` requests will wake the provided waker at a later
157 /// point in time.
158 pub fn is_pending(&self) -> bool {
159 self.tx.iter().any(|tx| tx.is_pending()) || self.rx.iter().any(|rx| rx.is_pending())
160 }
161
162 /// Returns the `tx` response
163 pub fn tx(&self) -> Option<&tx::Response> {
164 self.tx.as_ref()
165 }
166
167 /// Returns the `rx` response
168 pub fn rx(&self) -> Option<&rx::Response> {
169 self.rx.as_ref()
170 }
171}
172
173/// Request and response related to transmitting on a stream
174pub mod tx {
175 use super::*;
176
177 /// A request on a `tx` stream
178 #[derive(Default, Debug)]
179 pub struct Request<'a> {
180 /// Optionally transmit chunks onto the stream
181 ///
182 /// The chunks will be replaced with empty buffers as they are stored in the transmission
183 /// buffer. The response will indicate how many chunks and bytes were consumed from
184 /// this slice.
185 pub chunks: Option<&'a mut [bytes::Bytes]>,
186
187 /// Optionally reset the stream with an error
188 pub reset: Option<application::Error>,
189
190 /// Waits for an ACK on resets and finishes
191 pub flush: bool,
192
193 /// Marks the tx stream as finished (e.g. no more data will be sent)
194 pub finish: bool,
195
196 /// Marks the tx stream as detached, which makes the stream make progress, regardless of
197 /// application observations.
198 pub detached: bool,
199 }
200
201 /// The result of a tx request
202 #[derive(Debug, PartialEq, Eq)]
203 pub struct Response {
204 /// Information about the bytes that were sent
205 pub bytes: Bytes,
206
207 /// Information about the chunks that were sent
208 pub chunks: Chunks,
209
210 /// Indicates if the operation resulted in storing the provided waker to notify when the
211 /// request may be polled again.
212 pub will_wake: bool,
213
214 /// The current status of the stream
215 pub status: Status,
216 }
217
218 impl Default for Response {
219 fn default() -> Self {
220 Self {
221 bytes: Bytes::default(),
222 chunks: Chunks::default(),
223 will_wake: false,
224 status: Status::Open,
225 }
226 }
227 }
228
229 impl Response {
230 /// Returns true if provided waker will be woken
231 pub fn is_pending(&self) -> bool {
232 self.will_wake
233 }
234
235 /// Returns the `tx` response
236 pub fn tx(&self) -> Option<&Self> {
237 Some(self)
238 }
239 }
240}
241
242/// Request and response related to receiving on a stream
243pub mod rx {
244 use super::*;
245
246 /// A request on a `rx` stream
247 #[derive(Debug)]
248 pub struct Request<'a> {
249 /// Optionally receive chunks from the stream
250 ///
251 /// At least one of the provided chunks should be empty, as it will be replaced by the
252 /// received data from the stream. The response will indicate how many chunks and
253 /// bytes were consumed from the stream into the provided slice.
254 pub chunks: Option<&'a mut [bytes::Bytes]>,
255
256 /// Sets the low watermark for the rx stream
257 ///
258 /// If the watermark is set to `0`, the caller will be notified as soon as there is data
259 /// available on the stream.
260 ///
261 /// If the watermark is greater than `0`, the caller will be notified as soon as there is at
262 /// least `low` bytes available to be read. Note that the stream may be woken earlier.
263 pub low_watermark: usize,
264
265 /// Sets the high watermark for the rx stream
266 ///
267 /// The stream ensures that all the received data will not exceed the watermark amount. This
268 /// can be useful for receiving at most `n` bytes.
269 pub high_watermark: usize,
270
271 /// Optionally requests the peer to stop sending data with an error
272 pub stop_sending: Option<application::Error>,
273
274 /// Marks the rx stream as detached, which makes the stream make progress, regardless of
275 /// application observations.
276 pub detached: bool,
277 }
278
279 impl Default for Request<'_> {
280 fn default() -> Self {
281 Self {
282 chunks: None,
283 low_watermark: 0,
284 high_watermark: usize::MAX,
285 stop_sending: None,
286 detached: false,
287 }
288 }
289 }
290
291 /// The result of a pop operation
292 #[derive(Debug, PartialEq, Eq)]
293 pub struct Response {
294 /// Information about the bytes that were received
295 pub bytes: Bytes,
296
297 /// Information about the chunks that were received
298 pub chunks: Chunks,
299
300 /// Indicates if the operation resulted in storing the provided waker to notify when the
301 /// request may be polled again.
302 pub will_wake: bool,
303
304 /// The current status of the stream
305 pub status: Status,
306 }
307
308 impl Default for Response {
309 fn default() -> Self {
310 Self {
311 bytes: Bytes::default(),
312 chunks: Chunks::default(),
313 will_wake: false,
314 status: Status::Open,
315 }
316 }
317 }
318
319 impl Response {
320 /// Returns true if provided waker will be woken
321 pub fn is_pending(&self) -> bool {
322 self.will_wake
323 }
324
325 /// Returns the `rx` response
326 pub fn rx(&self) -> Option<&Self> {
327 Some(self)
328 }
329 }
330}
331
332#[derive(Debug, Default, PartialEq, Eq)]
333pub struct Bytes {
334 /// The number of bytes that were consumed by the operation.
335 ///
336 /// In the case of `tx` operations, this is the number of bytes that were sent on the
337 /// stream.
338 ///
339 /// In the case of `rx` operations, this is the number of bytes that were received from the
340 /// stream.
341 pub consumed: usize,
342
343 /// The number of bytes that are available on the stream.
344 ///
345 /// In the case of `tx` operations, this is the number of additional bytes that can be sent
346 /// in the stream. Note that this is not a hard limit on accepting a chunk of data.
347 ///
348 /// In the case of `rx` operations, this is the number of additional bytes that can be received
349 /// from the stream.
350 pub available: usize,
351}
352
353#[derive(Debug, Default, PartialEq, Eq)]
354pub struct Chunks {
355 /// The number of chunks that were consumed by the operation.
356 ///
357 /// In the case of `tx` operations, this is the number of chunks that were sent on the
358 /// stream.
359 ///
360 /// In the case of `rx` operations, this is the number of chunks that were received from the
361 /// stream.
362 pub consumed: usize,
363
364 /// The number of chunks that are available on the stream.
365 ///
366 /// In the case of `tx` operations, this is the number of additional chunks that can be sent
367 /// in the stream. This value will be based on the assumption of 1 byte chunks and will
368 /// contain the same value of `bytes.available`.
369 ///
370 /// In the case of `rx` operations, this is the number of additional chunks that can be received
371 /// from the stream.
372 pub available: usize,
373}
374
375#[derive(Copy, Clone, Debug, PartialEq, Eq)]
376pub enum Status {
377 /// The stream is open and writable
378 Open,
379
380 /// The stream is finishing but still has data to be flushed
381 Finishing,
382
383 /// The stream is finished and completely flushed
384 Finished,
385
386 /// The stream has been reset locally but has not been acknowledged by the peer
387 Resetting,
388
389 /// The stream was reset either by the peer or locally
390 Reset(stream::StreamError),
391}
392
393macro_rules! impl_status {
394 (| $self:ident | $value:expr) => {
395 /// Returns `true` if the status is `Open`
396 pub fn is_open(&self) -> bool {
397 matches!(self.status(), Status::Open)
398 }
399
400 /// Returns `true` if the status is `Finishing`
401 pub fn is_finishing(&self) -> bool {
402 matches!(self.status(), Status::Finishing)
403 }
404
405 /// Returns `true` if the status is `Finished`
406 pub fn is_finished(&self) -> bool {
407 matches!(self.status(), Status::Finished)
408 }
409
410 /// Returns `true` if the status is `Resetting`
411 pub fn is_resetting(&self) -> bool {
412 matches!(self.status(), Status::Resetting)
413 }
414
415 /// Returns `true` if the status is `Reset`
416 pub fn is_reset(&self) -> bool {
417 matches!(self.status(), Status::Reset(_))
418 }
419
420 /// Returns `true` if the status is `Finishing` or `Resetting`
421 pub fn is_closing(&self) -> bool {
422 self.is_finishing() || self.is_resetting()
423 }
424
425 /// Returns `true` if the status is `Finished` or `Reset`
426 pub fn is_closed(&self) -> bool {
427 self.is_finished() || self.is_reset()
428 }
429
430 const fn status(&$self) -> Status {
431 $value
432 }
433 };
434}
435
436impl Status {
437 impl_status!(|self| *self);
438}
439
440impl rx::Response {
441 impl_status!(|self| self.status);
442}
443
444impl tx::Response {
445 impl_status!(|self| self.status);
446}
447
448macro_rules! conversions {
449 ($name:path) => {
450 impl $name {
451 /// Converts the response into a `Poll<Self>`
452 pub fn into_poll(self) -> Poll<Self> {
453 if self.is_pending() {
454 Poll::Pending
455 } else {
456 Poll::Ready(self)
457 }
458 }
459 }
460
461 impl From<$name> for () {
462 fn from(_: $name) {}
463 }
464
465 impl<T, E> From<$name> for Poll<Result<T, E>>
466 where
467 $name: Into<T>,
468 {
469 fn from(v: $name) -> Poll<Result<T, E>> {
470 if v.is_pending() {
471 Poll::Pending
472 } else {
473 Poll::Ready(Ok(v.into()))
474 }
475 }
476 }
477 };
478}
479
480conversions!(Response);
481conversions!(tx::Response);
482conversions!(rx::Response);
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 #[test]
489 fn request_builder_test() {
490 let mut request = Request::default();
491 let mut send_chunks = [bytes::Bytes::from_static(&[1])];
492 let mut receive_chunks = [
493 bytes::Bytes::from_static(&[2]),
494 bytes::Bytes::from_static(&[3]),
495 ];
496
497 request
498 .send(&mut send_chunks)
499 .finish()
500 .flush()
501 .reset(application::Error::new(1).unwrap())
502 .receive(&mut receive_chunks)
503 .with_watermark(5, 10)
504 .stop_sending(application::Error::new(2).unwrap());
505
506 assert!(matches!(
507 request,
508 Request {
509 tx: Some(tx::Request {
510 chunks: Some(tx_chunks),
511 finish: true,
512 flush: true,
513 reset: Some(reset),
514 detached: false,
515 }),
516 rx: Some(rx::Request {
517 chunks: Some(rx_chunks),
518 low_watermark: 5,
519 high_watermark: 10,
520 stop_sending: Some(stop_sending),
521 detached: false,
522 })
523 } if reset == application::Error::new(1).unwrap()
524 && stop_sending == application::Error::new(2).unwrap()
525 && tx_chunks.len() == 1
526 && rx_chunks.len() == 2
527 ));
528 }
529
530 #[test]
531 fn response_pending_test() {
532 for rx_pending in [false, true] {
533 for tx_pending in [false, true] {
534 let response = Response {
535 tx: Some(tx::Response {
536 will_wake: tx_pending,
537 ..Default::default()
538 }),
539 rx: Some(rx::Response {
540 will_wake: rx_pending,
541 ..Default::default()
542 }),
543 };
544
545 assert_eq!(response.is_pending(), rx_pending || tx_pending);
546
547 if rx_pending || tx_pending {
548 assert_eq!(response.into_poll(), Poll::Pending);
549 } else {
550 assert_eq!(
551 response.into_poll(),
552 Poll::Ready(Response {
553 tx: Some(tx::Response::default()),
554 rx: Some(rx::Response::default()),
555 })
556 );
557 }
558 }
559 }
560 }
561}