Skip to main content

aws_runtime/
content_encoding.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use aws_smithy_types::config_bag::{Storable, StoreReplace};
7use bytes::{Bytes, BytesMut};
8use pin_project_lite::pin_project;
9
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13const CRLF: &str = "\r\n";
14const CRLF_RAW: &[u8] = b"\r\n";
15
16const CHUNK_TERMINATOR: &str = "0\r\n";
17const CHUNK_TERMINATOR_RAW: &[u8] = b"0\r\n";
18
19const TRAILER_SEPARATOR: &[u8] = b":";
20
21/// Content encoding header value constants
22pub mod header_value {
23    /// Header value denoting "aws-chunked" encoding
24    pub const AWS_CHUNKED: &str = "aws-chunked";
25}
26
27/// Options used when constructing an [`AwsChunkedBody`].
28#[derive(Clone, Debug, Default)]
29#[non_exhaustive]
30pub struct AwsChunkedBodyOptions {
31    /// The total size of the stream. Because we only support unsigned encoding
32    /// this implies that there will only be a single chunk containing the
33    /// underlying payload.
34    stream_length: u64,
35    /// The length of each trailer sent within an `AwsChunkedBody`. Necessary in
36    /// order to correctly calculate the total size of the body accurately.
37    trailer_lengths: Vec<u64>,
38    /// Whether the aws-chunked encoding is disabled. This could occur, for instance,
39    /// if a user specifies a custom checksum, rendering aws-chunked encoding unnecessary.
40    disabled: bool,
41}
42
43impl Storable for AwsChunkedBodyOptions {
44    type Storer = StoreReplace<Self>;
45}
46
47impl AwsChunkedBodyOptions {
48    /// Create a new [`AwsChunkedBodyOptions`].
49    pub fn new(stream_length: u64, trailer_lengths: Vec<u64>) -> Self {
50        Self {
51            stream_length,
52            trailer_lengths,
53            disabled: false,
54        }
55    }
56
57    fn total_trailer_length(&self) -> u64 {
58        self.trailer_lengths.iter().sum::<u64>()
59            // We need to account for a CRLF after each trailer name/value pair
60            + (self.trailer_lengths.len() * CRLF.len()) as u64
61    }
62
63    /// Set the stream length in the options
64    pub fn with_stream_length(mut self, stream_length: u64) -> Self {
65        self.stream_length = stream_length;
66        self
67    }
68
69    /// Append a trailer length to the options
70    pub fn with_trailer_len(mut self, trailer_len: u64) -> Self {
71        self.trailer_lengths.push(trailer_len);
72        self
73    }
74
75    /// Create a new [`AwsChunkedBodyOptions`] with aws-chunked encoding disabled.
76    ///
77    /// When the option is disabled, the body must not be wrapped in an `AwsChunkedBody`.
78    pub fn disable_chunked_encoding() -> Self {
79        Self {
80            disabled: true,
81            ..Default::default()
82        }
83    }
84
85    /// Return whether aws-chunked encoding is disabled.
86    pub fn disabled(&self) -> bool {
87        self.disabled
88    }
89
90    /// Return the length of the body after `aws-chunked` encoding is applied
91    pub fn encoded_length(&self) -> u64 {
92        let mut length = 0;
93        if self.stream_length != 0 {
94            length += get_unsigned_chunk_bytes_length(self.stream_length);
95        }
96
97        // End chunk
98        length += CHUNK_TERMINATOR.len() as u64;
99
100        // Trailers
101        for len in self.trailer_lengths.iter() {
102            length += len + CRLF.len() as u64;
103        }
104
105        // Encoding terminator
106        length += CRLF.len() as u64;
107
108        length
109    }
110}
111
112#[derive(Debug, PartialEq, Eq)]
113enum AwsChunkedBodyState {
114    /// Write out the size of the chunk that will follow. Then, transition into the
115    /// `WritingChunk` state.
116    WritingChunkSize,
117    /// Write out the next chunk of data. Multiple polls of the inner body may need to occur before
118    /// all data is written out. Once there is no more data to write, transition into the
119    /// `WritingTrailers` state.
120    WritingChunk,
121    /// Write out all trailers associated with this `AwsChunkedBody` and then transition into the
122    /// `Closed` state.
123    WritingTrailers,
124    /// This is the final state. Write out the body terminator and then remain in this state.
125    Closed,
126}
127
128pin_project! {
129    /// A request body compatible with `Content-Encoding: aws-chunked`. This implementation is only
130    /// capable of writing a single chunk and does not support signed chunks.
131    ///
132    /// Chunked-Body grammar is defined in [ABNF] as:
133    ///
134    /// ```txt
135    /// Chunked-Body    = *chunk
136    ///                   last-chunk
137    ///                   chunked-trailer
138    ///                   CRLF
139    ///
140    /// chunk           = chunk-size CRLF chunk-data CRLF
141    /// chunk-size      = 1*HEXDIG
142    /// last-chunk      = 1*("0") CRLF
143    /// chunked-trailer = *( entity-header CRLF )
144    /// entity-header   = field-name ":" OWS field-value OWS
145    /// ```
146    /// For more info on what the abbreviations mean, see https://datatracker.ietf.org/doc/html/rfc7230#section-1.2
147    ///
148    /// [ABNF]:https://en.wikipedia.org/wiki/Augmented_Backus%E2%80%93Naur_form
149    #[derive(Debug)]
150    pub struct AwsChunkedBody<InnerBody> {
151        #[pin]
152        inner: InnerBody,
153        #[pin]
154        state: AwsChunkedBodyState,
155        options: AwsChunkedBodyOptions,
156        inner_body_bytes_read_so_far: usize,
157    }
158}
159
160impl<Inner> AwsChunkedBody<Inner> {
161    /// Wrap the given body in an outer body compatible with `Content-Encoding: aws-chunked`
162    pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
163        Self {
164            inner: body,
165            state: AwsChunkedBodyState::WritingChunkSize,
166            options,
167            inner_body_bytes_read_so_far: 0,
168        }
169    }
170}
171
172#[cfg(feature = "http-02x")]
173impl<Inner> http_body_04x::Body for AwsChunkedBody<Inner>
174where
175    Inner: http_body_04x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
176{
177    type Data = Bytes;
178    type Error = aws_smithy_types::body::Error;
179
180    fn poll_data(
181        self: Pin<&mut Self>,
182        cx: &mut Context<'_>,
183    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
184        tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
185        let mut this = self.project();
186
187        match *this.state {
188            AwsChunkedBodyState::WritingChunkSize => {
189                if this.options.stream_length == 0 {
190                    // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
191                    *this.state = AwsChunkedBodyState::WritingTrailers;
192                    tracing::trace!("stream is empty, writing chunk terminator");
193                    Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
194                } else {
195                    *this.state = AwsChunkedBodyState::WritingChunk;
196                    // A chunk must be prefixed by chunk size in hexadecimal
197                    let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
198                    tracing::trace!(%chunk_size, "writing chunk size");
199                    let chunk_size = Bytes::from(chunk_size);
200                    Poll::Ready(Some(Ok(chunk_size)))
201                }
202            }
203            AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
204                Poll::Ready(Some(Ok(data))) => {
205                    tracing::trace!(len = data.len(), "writing chunk data");
206                    *this.inner_body_bytes_read_so_far += data.len();
207                    Poll::Ready(Some(Ok(data)))
208                }
209                Poll::Ready(None) => {
210                    let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
211                    let expected_stream_length = this.options.stream_length;
212                    if actual_stream_length != expected_stream_length {
213                        let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
214                            actual: actual_stream_length,
215                            expected: expected_stream_length,
216                        });
217                        return Poll::Ready(Some(Err(err)));
218                    };
219
220                    tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
221                    *this.state = AwsChunkedBodyState::WritingTrailers;
222                    // Since we wrote chunk data, we end it with a CRLF and since we only write
223                    // a single chunk, we write the CHUNK_TERMINATOR immediately after
224                    Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
225                }
226                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
227                Poll::Pending => Poll::Pending,
228            },
229            AwsChunkedBodyState::WritingTrailers => {
230                return match this.inner.poll_trailers(cx) {
231                    Poll::Ready(Ok(trailers)) => {
232                        *this.state = AwsChunkedBodyState::Closed;
233                        let expected_length =
234                            http_02x_utils::total_rendered_length_of_trailers(trailers.as_ref());
235                        let actual_length = this.options.total_trailer_length();
236
237                        if expected_length != actual_length {
238                            let err =
239                                Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
240                                    actual: actual_length,
241                                    expected: expected_length,
242                                });
243                            return Poll::Ready(Some(Err(err)));
244                        }
245
246                        let mut trailers = http_02x_utils::trailers_as_aws_chunked_bytes(
247                            trailers,
248                            actual_length + 1,
249                        );
250                        // Insert the final CRLF to close the body
251                        trailers.extend_from_slice(CRLF.as_bytes());
252
253                        Poll::Ready(Some(Ok(trailers.into())))
254                    }
255                    Poll::Pending => Poll::Pending,
256                    Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
257                };
258            }
259            AwsChunkedBodyState::Closed => Poll::Ready(None),
260        }
261    }
262
263    fn poll_trailers(
264        self: Pin<&mut Self>,
265        _cx: &mut Context<'_>,
266    ) -> Poll<Result<Option<http_02x::HeaderMap<http_02x::HeaderValue>>, Self::Error>> {
267        // Trailers were already appended to the body because of the content encoding scheme
268        Poll::Ready(Ok(None))
269    }
270
271    fn is_end_stream(&self) -> bool {
272        self.state == AwsChunkedBodyState::Closed
273    }
274
275    fn size_hint(&self) -> http_body_04x::SizeHint {
276        http_body_04x::SizeHint::with_exact(self.options.encoded_length())
277    }
278}
279
280/// Utility functions to help with the [http_body_04x::Body] trait implementation
281#[cfg(feature = "http-02x")]
282mod http_02x_utils {
283    use super::{CRLF, TRAILER_SEPARATOR};
284    use bytes::BytesMut;
285    use http_02x::HeaderMap;
286
287    /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
288    /// returning.
289    ///
290    /// - Trailer names are separated by a single colon only, no space.
291    /// - Trailer names with multiple values will be written out one line per value, with the name
292    ///   appearing on each line.
293    pub(super) fn trailers_as_aws_chunked_bytes(
294        trailer_map: Option<HeaderMap>,
295        estimated_length: u64,
296    ) -> BytesMut {
297        if let Some(trailer_map) = trailer_map {
298            let mut current_header_name = None;
299            let mut trailers =
300                BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
301
302            for (header_name, header_value) in trailer_map.into_iter() {
303                // When a header has multiple values, the name only comes up in iteration the first time
304                // we see it. Therefore, we need to keep track of the last name we saw and fall back to
305                // it when `header_name == None`.
306                current_header_name = header_name.or(current_header_name);
307
308                // In practice, this will always exist, but `if let` is nicer than unwrap
309                if let Some(header_name) = current_header_name.as_ref() {
310                    trailers.extend_from_slice(header_name.as_ref());
311                    trailers.extend_from_slice(TRAILER_SEPARATOR);
312                    trailers.extend_from_slice(header_value.as_bytes());
313                    trailers.extend_from_slice(CRLF.as_bytes());
314                }
315            }
316
317            trailers
318        } else {
319            BytesMut::new()
320        }
321    }
322
323    /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
324    /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
325    ///
326    /// - Trailer names are separated by a single colon only, no space.
327    /// - Trailer names with multiple values will be written out one line per value, with the name
328    ///   appearing on each line.
329    pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
330        match trailer_map {
331            Some(trailer_map) => trailer_map
332                .iter()
333                .map(|(trailer_name, trailer_value)| {
334                    trailer_name.as_str().len()
335                        + TRAILER_SEPARATOR.len()
336                        + trailer_value.len()
337                        + CRLF.len()
338                })
339                .sum::<usize>() as u64,
340            None => 0,
341        }
342    }
343}
344
345const UNREACHABLE_STATES: &str = "These states already short circuited";
346
347/// Implementing the [http_body_1x::Body] trait
348impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
349where
350    Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
351{
352    type Data = Bytes;
353    type Error = aws_smithy_types::body::Error;
354
355    fn is_end_stream(&self) -> bool {
356        self.state == AwsChunkedBodyState::Closed
357    }
358
359    fn size_hint(&self) -> http_body_1x::SizeHint {
360        http_body_1x::SizeHint::with_exact(self.options.encoded_length())
361    }
362
363    fn poll_frame(
364        self: Pin<&mut Self>,
365        cx: &mut Context<'_>,
366    ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
367        tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
368        let mut this = self.project();
369
370        // Both `WritingChunkSize` and `Closed` states short circuit without polling the inner body
371
372        // Initial setup, we do not poll the inner body here
373        if *this.state == AwsChunkedBodyState::WritingChunkSize {
374            if this.options.stream_length == 0 {
375                // If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
376                tracing::trace!("stream is empty, writing chunk terminator");
377                let frame = http_body_1x::Frame::data(Bytes::from(CHUNK_TERMINATOR));
378                *this.state = AwsChunkedBodyState::WritingTrailers;
379                return Poll::Ready(Some(Ok(frame)));
380            } else {
381                // A chunk must be prefixed by chunk size in hexadecimal
382                let chunk_size = format!(
383                    "{:X?}{}",
384                    this.options.stream_length,
385                    std::str::from_utf8(CRLF_RAW).unwrap()
386                );
387                tracing::trace!(%chunk_size, "writing chunk size");
388                let chunk_size = http_body_1x::Frame::data(Bytes::from(chunk_size));
389                *this.state = AwsChunkedBodyState::WritingChunk;
390                return Poll::Ready(Some(Ok(chunk_size)));
391            }
392        }
393
394        // Polled after completion
395        if *this.state == AwsChunkedBodyState::Closed {
396            return Poll::Ready(None);
397        }
398
399        // For all other states we must poll the inner body
400        let maybe_frame = this.inner.poll_frame(cx);
401        tracing::trace!(poll_state = ?maybe_frame, "Polling InnerBody");
402
403        match maybe_frame {
404            Poll::Ready(Some(Ok(frame))) => match *this.state {
405                // Both data chunks and trailers are written as Frame::data so we treat these states similarly
406                // Importantly we cannot know that the body data of the InnerBody is exhausted until we see a
407                // trailer frame or a Poll::Ready(None)
408                AwsChunkedBodyState::WritingChunk => {
409                    if frame.is_data() {
410                        let data = frame.data_ref().expect("Data frame has data");
411                        tracing::trace!(len = data.len(), "Writing chunk data");
412                        *this.inner_body_bytes_read_so_far += data.len();
413                        Poll::Ready(Some(Ok(frame)))
414                    } else {
415                        tracing::trace!(
416                            "No more chunk data, writing CRLF + CHUNK_TERMINATOR to end the data, and the first trailer frame"
417                        );
418
419                        // We exhausted the body data, now check if the length is correct
420                        if let Err(poll_stream_len_err) =
421                            http_1x_utils::check_for_stream_length_mismatch(
422                                *this.inner_body_bytes_read_so_far as u64,
423                                this.options.stream_length,
424                            )
425                        {
426                            return poll_stream_len_err;
427                        }
428
429                        *this.state = AwsChunkedBodyState::WritingTrailers;
430                        let trailers = frame.trailers_ref();
431
432                        // NOTE: there is a subtle logic bug here (which is present in the http-02x implementation as well)
433                        // The check for this error assumes that all trailers will come in a single trailer frame. Currently
434                        // I believe this will always be the case since the only thing we send trailers for in AwsChunked is
435                        // streaming checksums and that is a single trailer value. But it might not always be true. We should
436                        // fix this bug when we update the behavior here to match the actual spec.
437                        // The fix probably looks like returning Poll::Pending while we buffer all of the trailers and then
438                        // comparing the actual length to the expected length before returning a final frame containing all
439                        // of the trailers.
440                        let actual_length: u64 =
441                            http_1x_utils::total_rendered_length_of_trailers(trailers);
442                        let expected_length = this.options.total_trailer_length();
443                        if expected_length != actual_length {
444                            let err =
445                                Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
446                                    actual: actual_length,
447                                    expected: expected_length,
448                                });
449                            return Poll::Ready(Some(Err(err)));
450                        }
451
452                        // Capacity = actual_length (in case all of the trailers specified in  come in AwsChunkedBodyOptions
453                        // come in the first trailer frame which is going to be the case most of the time in practice) + 7
454                        // (2 + 3) for the initial CRLF + CHUNK_TERMINATOR to end the chunked data + 2 for the final CRLF
455                        // ending the trailers section.
456                        let mut buf = BytesMut::with_capacity(actual_length as usize + 7);
457                        // End the final data chunk
458                        buf.extend_from_slice(&[CRLF_RAW, CHUNK_TERMINATOR_RAW].concat());
459
460                        // We transform the trailers into raw bytes. We can't write them with Frame::trailers
461                        // since we must include the CRLF + CHUNK_TERMINATOR that end the body and the CRLFs
462                        // after each trailer, so we write them as Frame::data
463                        let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
464                        Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
465                    }
466                }
467                AwsChunkedBodyState::WritingTrailers => {
468                    let trailers = frame.trailers_ref();
469                    let actual_length: u64 =
470                        http_1x_utils::total_rendered_length_of_trailers(trailers);
471                    let buf = BytesMut::with_capacity(actual_length as usize + 7);
472                    let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
473                    Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
474                }
475                AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
476                    unreachable!("{}", UNREACHABLE_STATES)
477                }
478            },
479            // InnerBody data exhausted, add finalizing bytes depending on current state
480            Poll::Ready(None) => {
481                let trailers = match *this.state {
482                    AwsChunkedBodyState::WritingChunk => {
483                        // We exhausted the body data, now check if the length is correct
484                        if let Err(poll_stream_len_err) =
485                            http_1x_utils::check_for_stream_length_mismatch(
486                                *this.inner_body_bytes_read_so_far as u64,
487                                this.options.stream_length,
488                            )
489                        {
490                            return poll_stream_len_err;
491                        }
492
493                        // Since we exhausted the body data, but are still in the WritingChunk state we did
494                        // not poll any trailer frames and we write the CRLF + Chunk terminator to begin the
495                        // trailer section plus a single final CRLF to end the (empty) trailer section
496                        let mut trailers = BytesMut::with_capacity(7);
497                        trailers.extend_from_slice(
498                            &[CRLF_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW].concat(),
499                        );
500                        trailers
501                    }
502                    AwsChunkedBodyState::WritingTrailers => {
503                        let mut trailers = BytesMut::with_capacity(2);
504                        trailers.extend_from_slice(CRLF_RAW);
505                        trailers
506                    }
507                    AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
508                        unreachable!("{}", UNREACHABLE_STATES)
509                    }
510                };
511
512                let frame = http_body_1x::Frame::data(trailers.into());
513                *this.state = AwsChunkedBodyState::Closed;
514                Poll::Ready(Some(Ok(frame)))
515            }
516            // Passthrough states
517            Poll::Pending => Poll::Pending,
518            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
519        }
520    }
521}
522/// Utility functions to help with the [http_body_1x::Body] trait implementation
523mod http_1x_utils {
524    use std::task::Poll;
525
526    use super::{CRLF_RAW, TRAILER_SEPARATOR};
527    use bytes::{Bytes, BytesMut};
528    use http_1x::{HeaderMap, HeaderName};
529
530    /// Writes trailers out into a `string` and then converts that `String` to a `Bytes` before
531    /// returning.
532    ///
533    /// - Trailer names are separated by a single colon only, no space.
534    /// - Trailer names with multiple values will be written out one line per value, with the name
535    ///   appearing on each line.
536    pub(super) fn trailers_as_aws_chunked_bytes(
537        trailer_map: Option<&HeaderMap>,
538        mut buffer: BytesMut,
539    ) -> BytesMut {
540        if let Some(trailer_map) = trailer_map {
541            let mut current_header_name: Option<HeaderName> = None;
542
543            for (header_name, header_value) in trailer_map.clone().into_iter() {
544                // When a header has multiple values, the name only comes up in iteration the first time
545                // we see it. Therefore, we need to keep track of the last name we saw and fall back to
546                // it when `header_name == None`.
547                current_header_name = header_name.or(current_header_name);
548
549                // In practice, this will always exist, but `if let` is nicer than unwrap
550                if let Some(header_name) = current_header_name.as_ref() {
551                    buffer.extend_from_slice(header_name.as_ref());
552                    buffer.extend_from_slice(TRAILER_SEPARATOR);
553                    buffer.extend_from_slice(header_value.as_bytes());
554                    buffer.extend_from_slice(CRLF_RAW);
555                }
556            }
557
558            buffer
559        } else {
560            buffer
561        }
562    }
563
564    /// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
565    /// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
566    ///
567    /// - Trailer names are separated by a single colon only, no space.
568    /// - Trailer names with multiple values will be written out one line per value, with the name
569    ///   appearing on each line.
570    pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
571        match trailer_map {
572            Some(trailer_map) => trailer_map
573                .iter()
574                .map(|(trailer_name, trailer_value)| {
575                    trailer_name.as_str().len()
576                        + TRAILER_SEPARATOR.len()
577                        + trailer_value.len()
578                        + CRLF_RAW.len()
579                })
580                .sum::<usize>() as u64,
581            None => 0,
582        }
583    }
584
585    /// This is an ugly return type, but in practice it just returns `Ok(())` if the values match
586    /// and `Err(Poll::Ready(Some(Err(AwsChunkedBodyError::StreamLengthMismatch))))` if they don't
587    #[allow(clippy::type_complexity)]
588    pub(super) fn check_for_stream_length_mismatch(
589        actual_stream_length: u64,
590        expected_stream_length: u64,
591    ) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>>
592    {
593        if actual_stream_length != expected_stream_length {
594            let err = Box::new(super::AwsChunkedBodyError::StreamLengthMismatch {
595                actual: actual_stream_length,
596                expected: expected_stream_length,
597            });
598            return Err(Poll::Ready(Some(Err(err))));
599        };
600
601        Ok(())
602    }
603}
604
605/// Errors related to `AwsChunkedBody`
606#[derive(Debug)]
607enum AwsChunkedBodyError {
608    /// Error that occurs when the sum of `trailer_lengths` set when creating an `AwsChunkedBody` is
609    /// not equal to the actual length of the trailers returned by the inner `http_body::Body`
610    /// implementor. These trailer lengths are necessary in order to correctly calculate the total
611    /// size of the body for setting the content length header.
612    ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
613    /// Error that occurs when the `stream_length` set when creating an `AwsChunkedBody` is not
614    /// equal to the actual length of the body returned by the inner `http_body::Body` implementor.
615    /// `stream_length` must be correct in order to set an accurate content length header.
616    StreamLengthMismatch { actual: u64, expected: u64 },
617}
618
619impl std::fmt::Display for AwsChunkedBodyError {
620    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621        match self {
622            Self::ReportedTrailerLengthMismatch { actual, expected } => {
623                write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
624            }
625            Self::StreamLengthMismatch { actual, expected } => {
626                write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
627            }
628        }
629    }
630}
631
632impl std::error::Error for AwsChunkedBodyError {}
633
634// Used for finding how many hexadecimal digits it takes to represent a base 10 integer
635fn int_log16<T>(mut i: T) -> u64
636where
637    T: std::ops::DivAssign + PartialOrd + From<u8> + Copy,
638{
639    let mut len = 0;
640    let zero = T::from(0);
641    let sixteen = T::from(16);
642
643    while i > zero {
644        i /= sixteen;
645        len += 1;
646    }
647
648    len
649}
650
651fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
652    let hex_repr_len = int_log16(payload_length);
653    hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
654}
655
656#[cfg(test)]
657mod tests {
658
659    #[cfg(test)]
660    #[cfg(feature = "http-02x")]
661    mod http_02x_tests {
662        use super::super::{
663            http_02x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
664            AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
665        };
666
667        use aws_smithy_types::body::SdkBody;
668        use bytes::{Buf, Bytes};
669        use bytes_utils::SegmentedBuf;
670        use http_02x::{HeaderMap, HeaderValue};
671        use http_body_04x::{Body, SizeHint};
672        use pin_project_lite::pin_project;
673
674        use std::io::Read;
675        use std::pin::Pin;
676        use std::task::{Context, Poll};
677        use std::time::Duration;
678
679        pin_project! {
680            struct SputteringBody {
681                parts: Vec<Option<Bytes>>,
682                cursor: usize,
683                delay_in_millis: u64,
684            }
685        }
686
687        impl SputteringBody {
688            fn len(&self) -> usize {
689                self.parts.iter().flatten().map(|b| b.len()).sum()
690            }
691        }
692
693        impl Body for SputteringBody {
694            type Data = Bytes;
695            type Error = aws_smithy_types::body::Error;
696
697            fn poll_data(
698                self: Pin<&mut Self>,
699                cx: &mut Context<'_>,
700            ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
701                if self.cursor == self.parts.len() {
702                    return Poll::Ready(None);
703                }
704
705                let this = self.project();
706                let delay_in_millis = *this.delay_in_millis;
707                let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
708
709                match next_part {
710                    None => {
711                        *this.cursor += 1;
712                        let waker = cx.waker().clone();
713                        tokio::spawn(async move {
714                            tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
715                            waker.wake();
716                        });
717                        Poll::Pending
718                    }
719                    Some(data) => {
720                        *this.cursor += 1;
721                        Poll::Ready(Some(Ok(data)))
722                    }
723                }
724            }
725
726            fn poll_trailers(
727                self: Pin<&mut Self>,
728                _cx: &mut Context<'_>,
729            ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
730                Poll::Ready(Ok(None))
731            }
732
733            fn is_end_stream(&self) -> bool {
734                false
735            }
736
737            fn size_hint(&self) -> SizeHint {
738                SizeHint::new()
739            }
740        }
741
742        #[tokio::test]
743        async fn test_aws_chunked_encoding() {
744            let test_fut = async {
745                let input_str = "Hello world";
746                let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
747                let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
748
749                let mut output = SegmentedBuf::new();
750                while let Some(buf) = body.data().await {
751                    output.push(buf.unwrap());
752                }
753
754                let mut actual_output = String::new();
755                output
756                    .reader()
757                    .read_to_string(&mut actual_output)
758                    .expect("Doesn't cause IO errors");
759
760                let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
761
762                assert_eq!(expected_output, actual_output);
763                assert!(
764                    body.trailers()
765                        .await
766                        .expect("no errors occurred during trailer polling")
767                        .is_none(),
768                    "aws-chunked encoded bodies don't have normal HTTP trailers"
769                );
770
771                // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
772            };
773
774            let timeout_duration = Duration::from_secs(3);
775            if tokio::time::timeout(timeout_duration, test_fut)
776                .await
777                .is_err()
778            {
779                panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
780            }
781        }
782
783        #[tokio::test]
784        async fn test_aws_chunked_encoding_sputtering_body() {
785            let test_fut = async {
786                let input = SputteringBody {
787                    parts: vec![
788                        Some(Bytes::from_static(b"chunk 1, ")),
789                        None,
790                        Some(Bytes::from_static(b"chunk 2, ")),
791                        Some(Bytes::from_static(b"chunk 3, ")),
792                        None,
793                        None,
794                        Some(Bytes::from_static(b"chunk 4, ")),
795                        Some(Bytes::from_static(b"chunk 5, ")),
796                        Some(Bytes::from_static(b"chunk 6")),
797                    ],
798                    cursor: 0,
799                    delay_in_millis: 500,
800                };
801                let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
802                let mut body = AwsChunkedBody::new(input, opts);
803
804                let mut output = SegmentedBuf::new();
805                while let Some(buf) = body.data().await {
806                    output.push(buf.unwrap());
807                }
808
809                let mut actual_output = String::new();
810                output
811                    .reader()
812                    .read_to_string(&mut actual_output)
813                    .expect("Doesn't cause IO errors");
814
815                let expected_output =
816                    "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
817
818                assert_eq!(expected_output, actual_output);
819                assert!(
820                    body.trailers()
821                        .await
822                        .expect("no errors occurred during trailer polling")
823                        .is_none(),
824                    "aws-chunked encoded bodies don't have normal HTTP trailers"
825                );
826            };
827
828            let timeout_duration = Duration::from_secs(3);
829            if tokio::time::timeout(timeout_duration, test_fut)
830                .await
831                .is_err()
832            {
833                panic!(
834                "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
835            );
836            }
837        }
838
839        #[tokio::test]
840        #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
841        async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
842            let input_str = "Hello world";
843            // Test body has no trailers, so this length is incorrect and will trigger an assert panic
844            // When the panic occurs, it will actually expect a length of 44. This is because, when using
845            // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
846            let wrong_trailer_len = 42;
847            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
848            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
849
850            // We don't care about the body contents but we have to read it all before checking for trailers
851            while let Some(buf) = body.data().await {
852                drop(buf.unwrap());
853            }
854
855            assert!(
856                body.trailers()
857                    .await
858                    .expect("no errors occurred during trailer polling")
859                    .is_none(),
860                "aws-chunked encoded bodies don't have normal HTTP trailers"
861            );
862        }
863
864        #[tokio::test]
865        async fn test_aws_chunked_encoding_empty_body() {
866            let input_str = "";
867            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
868            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
869
870            let mut output = SegmentedBuf::new();
871            while let Some(buf) = body.data().await {
872                output.push(buf.unwrap());
873            }
874
875            let mut actual_output = String::new();
876            output
877                .reader()
878                .read_to_string(&mut actual_output)
879                .expect("Doesn't cause IO errors");
880
881            let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
882
883            assert_eq!(expected_output, actual_output);
884            assert!(
885                body.trailers()
886                    .await
887                    .expect("no errors occurred during trailer polling")
888                    .is_none(),
889                "aws-chunked encoded bodies don't have normal HTTP trailers"
890            );
891        }
892
893        #[tokio::test]
894        async fn test_total_rendered_length_of_trailers() {
895            let mut trailers = HeaderMap::new();
896
897            trailers.insert("empty_value", HeaderValue::from_static(""));
898
899            trailers.insert("single_value", HeaderValue::from_static("value 1"));
900
901            trailers.insert("two_values", HeaderValue::from_static("value 1"));
902            trailers.append("two_values", HeaderValue::from_static("value 2"));
903
904            trailers.insert("three_values", HeaderValue::from_static("value 1"));
905            trailers.append("three_values", HeaderValue::from_static("value 2"));
906            trailers.append("three_values", HeaderValue::from_static("value 3"));
907
908            let trailers = Some(trailers);
909            let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
910            let expected_length =
911                (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
912
913            assert_eq!(expected_length, actual_length);
914        }
915
916        #[tokio::test]
917        async fn test_total_rendered_length_of_empty_trailers() {
918            let trailers = Some(HeaderMap::new());
919            let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
920            let expected_length =
921                (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
922
923            assert_eq!(expected_length, actual_length);
924        }
925    }
926
927    #[cfg(test)]
928    mod http_1x_tests {
929        use super::super::{
930            http_1x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
931            AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
932        };
933
934        use aws_smithy_types::body::SdkBody;
935        use bytes::{Buf, Bytes, BytesMut};
936        use bytes_utils::SegmentedBuf;
937        use http_1x::{HeaderMap, HeaderValue};
938        use http_body_1x::{Body, Frame, SizeHint};
939        use http_body_util::BodyExt;
940        use pin_project_lite::pin_project;
941
942        use std::io::Read;
943        use std::pin::Pin;
944        use std::task::{Context, Poll};
945        use std::time::Duration;
946
947        pin_project! {
948            struct SputteringBody {
949                parts: Vec<Option<Bytes>>,
950                cursor: usize,
951                delay_in_millis: u64,
952            }
953        }
954
955        impl SputteringBody {
956            fn len(&self) -> usize {
957                self.parts.iter().flatten().map(|b| b.len()).sum()
958            }
959        }
960
961        impl Body for SputteringBody {
962            type Data = Bytes;
963            type Error = aws_smithy_types::body::Error;
964
965            fn poll_frame(
966                self: Pin<&mut Self>,
967                cx: &mut Context<'_>,
968            ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
969                if self.cursor == self.parts.len() {
970                    return Poll::Ready(None);
971                }
972
973                let this = self.project();
974                let delay_in_millis = *this.delay_in_millis;
975                let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
976
977                match next_part {
978                    None => {
979                        *this.cursor += 1;
980                        let waker = cx.waker().clone();
981                        tokio::spawn(async move {
982                            tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
983                            waker.wake();
984                        });
985                        Poll::Pending
986                    }
987                    Some(data) => {
988                        *this.cursor += 1;
989                        let frame = Frame::data(data);
990                        Poll::Ready(Some(Ok(frame)))
991                    }
992                }
993            }
994
995            fn is_end_stream(&self) -> bool {
996                false
997            }
998
999            fn size_hint(&self) -> SizeHint {
1000                SizeHint::new()
1001            }
1002        }
1003
1004        #[tokio::test]
1005        async fn test_aws_chunked_encoding() {
1006            let test_fut = async {
1007                let input_str = "Hello world";
1008                let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1009                let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1010
1011                let mut output = SegmentedBuf::new();
1012                while let Some(Ok(buf)) = body.frame().await {
1013                    output.push(buf.into_data().unwrap());
1014                }
1015
1016                let mut actual_output = String::new();
1017                output
1018                    .reader()
1019                    .read_to_string(&mut actual_output)
1020                    .expect("Doesn't cause IO errors");
1021
1022                let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
1023
1024                assert_eq!(expected_output, actual_output);
1025
1026                // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
1027            };
1028
1029            let timeout_duration = Duration::from_secs(3);
1030            if tokio::time::timeout(timeout_duration, test_fut)
1031                .await
1032                .is_err()
1033            {
1034                panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
1035            }
1036        }
1037
1038        #[tokio::test]
1039        async fn test_aws_chunked_encoding_sputtering_body() {
1040            let test_fut = async {
1041                let input = SputteringBody {
1042                    parts: vec![
1043                        Some(Bytes::from_static(b"chunk 1, ")),
1044                        None,
1045                        Some(Bytes::from_static(b"chunk 2, ")),
1046                        Some(Bytes::from_static(b"chunk 3, ")),
1047                        None,
1048                        None,
1049                        Some(Bytes::from_static(b"chunk 4, ")),
1050                        Some(Bytes::from_static(b"chunk 5, ")),
1051                        Some(Bytes::from_static(b"chunk 6")),
1052                    ],
1053                    cursor: 0,
1054                    delay_in_millis: 500,
1055                };
1056                let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
1057                let mut body = AwsChunkedBody::new(input, opts);
1058
1059                let mut output = SegmentedBuf::new();
1060                while let Some(Ok(buf)) = body.frame().await {
1061                    output.push(buf.into_data().unwrap());
1062                }
1063
1064                let mut actual_output = String::new();
1065                output
1066                    .reader()
1067                    .read_to_string(&mut actual_output)
1068                    .expect("Doesn't cause IO errors");
1069
1070                let expected_output =
1071                    "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
1072
1073                assert_eq!(expected_output, actual_output);
1074            };
1075
1076            let timeout_duration = Duration::from_secs(3);
1077            if tokio::time::timeout(timeout_duration, test_fut)
1078                .await
1079                .is_err()
1080            {
1081                panic!(
1082                "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
1083            );
1084            }
1085        }
1086
1087        #[tokio::test]
1088        async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
1089            let input_str = "Hello world";
1090            // Test body has no trailers, so this length is incorrect and will trigger an assert panic
1091            // When the panic occurs, it will actually expect a length of 44. This is because, when using
1092            // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
1093            let wrong_trailer_len = 42;
1094            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
1095            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1096
1097            // We don't care about the body contents but we have to read it all before checking for trailers
1098            while let Some(Ok(frame)) = body.frame().await {
1099                assert!(!frame.is_trailers());
1100            }
1101        }
1102
1103        #[tokio::test]
1104        async fn test_aws_chunked_encoding_empty_body() {
1105            let input_str = "";
1106            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1107            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1108
1109            let mut output = SegmentedBuf::new();
1110            while let Some(Ok(frame)) = body.frame().await {
1111                output.push(frame.into_data().unwrap());
1112            }
1113
1114            let mut actual_output = String::new();
1115            output
1116                .reader()
1117                .read_to_string(&mut actual_output)
1118                .expect("Doesn't cause IO errors");
1119
1120            let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
1121            let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
1122            let expected_output = std::str::from_utf8(&expected_output).unwrap();
1123
1124            assert_eq!(expected_output, actual_output);
1125        }
1126
1127        #[tokio::test]
1128        async fn test_total_rendered_length_of_trailers() {
1129            let mut trailers = HeaderMap::new();
1130
1131            trailers.insert("empty_value", HeaderValue::from_static(""));
1132
1133            trailers.insert("single_value", HeaderValue::from_static("value 1"));
1134
1135            trailers.insert("two_values", HeaderValue::from_static("value 1"));
1136            trailers.append("two_values", HeaderValue::from_static("value 2"));
1137
1138            trailers.insert("three_values", HeaderValue::from_static("value 1"));
1139            trailers.append("three_values", HeaderValue::from_static("value 2"));
1140            trailers.append("three_values", HeaderValue::from_static("value 3"));
1141
1142            let trailers = Some(&trailers);
1143            let actual_length = total_rendered_length_of_trailers(trailers);
1144            let buf = BytesMut::with_capacity(actual_length as usize);
1145            let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1146
1147            assert_eq!(expected_length, actual_length);
1148        }
1149
1150        #[tokio::test]
1151        async fn test_total_rendered_length_of_empty_trailers() {
1152            let header_map = HeaderMap::new();
1153            let trailers = Some(&header_map);
1154            let actual_length = total_rendered_length_of_trailers(trailers);
1155            let buf = BytesMut::with_capacity(actual_length as usize);
1156            let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1157
1158            assert_eq!(expected_length, actual_length);
1159        }
1160    }
1161}