Skip to main content

aws_runtime/content_encoding/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6#[cfg(feature = "http-02x")]
7mod http_body_0_x;
8
9mod http_body_1_x;
10
11use crate::content_encoding::{AwsChunkedBodyOptions, SignChunk};
12use aws_sigv4::http_request::SigningError;
13use bytes::{Buf, Bytes};
14use bytes_utils::SegmentedBuf;
15use pin_project_lite::pin_project;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19#[derive(Debug)]
20pub(super) enum ChunkBuf {
21    /// Nothing has been buffered yet.
22    Empty,
23    /// Some data has been buffered.
24    /// The SegmentedBuf will automatically purge when it reads off the end of a chunk boundary.
25    Partial(SegmentedBuf<Bytes>),
26    /// The end of the stream has been reached, but there may still be some buffered data.
27    EosPartial(SegmentedBuf<Bytes>),
28    /// An exception terminated this stream.
29    Terminated,
30}
31
32impl ChunkBuf {
33    /// Return true if there's more buffered data.
34    pub(super) fn remaining(&self) -> usize {
35        match self {
36            ChunkBuf::Empty | ChunkBuf::Terminated => 0,
37            ChunkBuf::Partial(segments) | ChunkBuf::EosPartial(segments) => segments.remaining(),
38        }
39    }
40
41    /// Return true if the stream has ended.
42    pub(super) fn is_eos(&self) -> bool {
43        matches!(self, ChunkBuf::EosPartial(_) | ChunkBuf::Terminated)
44    }
45
46    /// Return a mutable reference to the underlying buffered data.
47    pub(super) fn buffered(&mut self) -> &mut SegmentedBuf<Bytes> {
48        match self {
49            ChunkBuf::Empty => panic!("buffer must be populated before reading; this is a bug"),
50            ChunkBuf::Partial(segmented) => segmented,
51            ChunkBuf::EosPartial(segmented) => segmented,
52            ChunkBuf::Terminated => panic!("buffer has been terminated; this is a bug"),
53        }
54    }
55
56    /// Return a `ChunkBuf` that has reached end of stream.
57    pub(super) fn ended(self) -> Self {
58        match self {
59            ChunkBuf::Empty => ChunkBuf::EosPartial(SegmentedBuf::new()),
60            ChunkBuf::Partial(segmented) => ChunkBuf::EosPartial(segmented),
61            ChunkBuf::EosPartial(_) => panic!("already end of stream; this is a bug"),
62            ChunkBuf::Terminated => panic!("stream terminated; this is a bug"),
63        }
64    }
65}
66
67#[derive(Debug, PartialEq, Eq)]
68pub(super) enum AwsChunkedBodyState {
69    /// Write out the chunk data.
70    WritingChunk,
71    #[cfg(feature = "http-02x")]
72    /// Write out the next chunk of data. Multiple polls of the inner body may need to occur before
73    /// all data is written out.
74    WritingChunkData,
75    /// Write out a zero-sized signed chunk.
76    WritingZeroSizedSignedChunk,
77    /// Buffer all trailers from the inner body, which avoids assuming trailing headers fit in a single frame.
78    PollingTrailers,
79    /// Write out all trailers associated with this `AwsChunkedBody` and then transition into the
80    /// `Closed` state.
81    WritingTrailers,
82    /// This is the final state. Write out the body terminator and then remain in this state.
83    Closed,
84}
85
86pin_project! {
87    /// A request body compatible with `Content-Encoding: aws-chunked`.
88    ///
89    /// See [SigV4 streaming](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html)
90    /// and [streaming trailers](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming-trailers.html).
91    #[derive(Debug)]
92    pub struct AwsChunkedBody<InnerBody> {
93        #[pin]
94        pub(super) inner: InnerBody,
95        #[pin]
96        pub(super) state: AwsChunkedBodyState,
97        pub(super) options: AwsChunkedBodyOptions,
98        pub(super) inner_body_bytes_read_so_far: usize,
99        #[pin]
100        pub(super) chunk_buffer: ChunkBuf,
101        #[pin]
102        pub(super) buffered_trailing_headers: Option<http_1x::HeaderMap>,
103        #[pin]
104        pub(super) signer: Option<std::panic::AssertUnwindSafe<Box<dyn SignChunk + Send + Sync>>>,
105    }
106}
107
108impl<Inner> AwsChunkedBody<Inner> {
109    /// Wrap the given body in an outer body compatible with `Content-Encoding: aws-chunked`
110    pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
111        Self {
112            inner: body,
113            state: AwsChunkedBodyState::WritingChunk,
114            options,
115            inner_body_bytes_read_so_far: 0,
116            chunk_buffer: ChunkBuf::Empty,
117            buffered_trailing_headers: None,
118            signer: None,
119        }
120    }
121
122    /// Set signer for signing chunks and trailers.
123    #[allow(private_bounds)] // Until we support chunk signing for a custom signer, the trait does not need to be public
124    pub fn with_signer<S>(mut self, signer: S) -> Self
125    where
126        S: SignChunk + Send + Sync + 'static,
127    {
128        self.signer = Some(std::panic::AssertUnwindSafe(Box::new(signer)));
129        self
130    }
131
132    // Buffer the next chunk from the inner body into the provided `chunk_buffer`, and return
133    // whether or not it should continue reading from `inner`.
134    //
135    // If it has exhausted data frames and started polling trailers, the buffered trailer will be
136    // pushed into `buffered_trailing_headers`, immediately marking the `chunk_buffer` as `eos`.
137    pub(super) fn buffer_next_chunk(
138        inner: Pin<&mut Inner>,
139        mut chunk_buffer: Pin<&mut ChunkBuf>,
140        mut buffered_trailing_headers: Pin<&mut Option<http_1x::HeaderMap>>,
141        cx: &mut Context<'_>,
142    ) -> Poll<Result<bool, aws_smithy_types::body::Error>>
143    where
144        Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
145    {
146        match inner.poll_frame(cx) {
147            Poll::Ready(Some(Ok(frame))) => {
148                if frame.is_data() {
149                    let data = frame.into_data().expect("just checked to be data");
150                    match chunk_buffer.as_mut().get_mut() {
151                        ChunkBuf::Empty => {
152                            let mut buf = SegmentedBuf::new();
153                            buf.push(data);
154                            *chunk_buffer.as_mut().get_mut() = ChunkBuf::Partial(buf);
155                        }
156                        ChunkBuf::Partial(buf) => buf.push(data),
157                        ChunkBuf::EosPartial(_) | ChunkBuf::Terminated => {
158                            panic!("cannot buffer more data after the stream has ended or been terminated; this is a bug")
159                        }
160                    }
161                    Poll::Ready(Ok(true))
162                } else {
163                    let buf = chunk_buffer.as_mut().get_mut();
164                    *buf = std::mem::replace(buf, ChunkBuf::Empty).ended();
165                    *buffered_trailing_headers.as_mut().get_mut() = frame.into_trailers().ok();
166                    Poll::Ready(Ok(false))
167                }
168            }
169            Poll::Ready(Some(Err(e))) => {
170                *chunk_buffer.as_mut().get_mut() = ChunkBuf::Terminated;
171                Poll::Ready(Err(e))
172            }
173            Poll::Ready(None) => Poll::Ready(Ok(false)),
174            Poll::Pending => Poll::Pending,
175        }
176    }
177}
178
179#[derive(Debug)]
180pub(super) enum AwsChunkedBodyError {
181    /// Error that occurs when the sum of `trailer_lengths` set when creating an `AwsChunkedBody` is
182    /// not equal to the actual length of the trailers returned by the inner `http_body::Body`
183    /// implementor. These trailer lengths are necessary in order to correctly calculate the total
184    /// size of the body for setting the content length header.
185    ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
186    /// Error that occurs when the `stream_length` set when creating an `AwsChunkedBody` is not
187    /// equal to the actual length of the body returned by the inner `http_body::Body` implementor.
188    /// `stream_length` must be correct in order to set an accurate content length header.
189    StreamLengthMismatch { actual: u64, expected: u64 },
190    /// Error that occurs when signing a chunk fails.
191    FailedToSign { source: SigningError },
192}
193
194impl std::fmt::Display for AwsChunkedBodyError {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        match self {
197            Self::ReportedTrailerLengthMismatch { actual, expected } => {
198                write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
199            }
200            Self::StreamLengthMismatch { actual, expected } => {
201                write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
202            }
203            Self::FailedToSign { source } => {
204                write!(f, "Signing error during aws-chunked encoding: {source}")
205            }
206        }
207    }
208}
209
210impl std::error::Error for AwsChunkedBodyError {}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_aws_chunked_body_is_unwind_safe_and_ref_unwind_safe() {
218        fn assert_unwind_safe<T: std::panic::UnwindSafe>() {}
219        fn assert_ref_unwind_safe<T: std::panic::RefUnwindSafe>() {}
220
221        assert_unwind_safe::<AwsChunkedBody<()>>();
222        assert_ref_unwind_safe::<AwsChunkedBody<()>>();
223    }
224}