aws_runtime/content_encoding/
body.rs1#[cfg(feature = "http-02x")]
7mod http_body_0_x;
8
9mod http_body_1_x;
10
11use crate::content_encoding::{AwsChunkedBodyOptions, SignChunk};
12use aws_sigv4::http_request::SigningError;
13use bytes::{Buf, Bytes};
14use bytes_utils::SegmentedBuf;
15use pin_project_lite::pin_project;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19#[derive(Debug)]
20pub(super) enum ChunkBuf {
21 Empty,
23 Partial(SegmentedBuf<Bytes>),
26 EosPartial(SegmentedBuf<Bytes>),
28 Terminated,
30}
31
32impl ChunkBuf {
33 pub(super) fn remaining(&self) -> usize {
35 match self {
36 ChunkBuf::Empty | ChunkBuf::Terminated => 0,
37 ChunkBuf::Partial(segments) | ChunkBuf::EosPartial(segments) => segments.remaining(),
38 }
39 }
40
41 pub(super) fn is_eos(&self) -> bool {
43 matches!(self, ChunkBuf::EosPartial(_) | ChunkBuf::Terminated)
44 }
45
46 pub(super) fn buffered(&mut self) -> &mut SegmentedBuf<Bytes> {
48 match self {
49 ChunkBuf::Empty => panic!("buffer must be populated before reading; this is a bug"),
50 ChunkBuf::Partial(segmented) => segmented,
51 ChunkBuf::EosPartial(segmented) => segmented,
52 ChunkBuf::Terminated => panic!("buffer has been terminated; this is a bug"),
53 }
54 }
55
56 pub(super) fn ended(self) -> Self {
58 match self {
59 ChunkBuf::Empty => ChunkBuf::EosPartial(SegmentedBuf::new()),
60 ChunkBuf::Partial(segmented) => ChunkBuf::EosPartial(segmented),
61 ChunkBuf::EosPartial(_) => panic!("already end of stream; this is a bug"),
62 ChunkBuf::Terminated => panic!("stream terminated; this is a bug"),
63 }
64 }
65}
66
67#[derive(Debug, PartialEq, Eq)]
68pub(super) enum AwsChunkedBodyState {
69 WritingChunk,
71 #[cfg(feature = "http-02x")]
72 WritingChunkData,
75 WritingZeroSizedSignedChunk,
77 PollingTrailers,
79 WritingTrailers,
82 Closed,
84}
85
86pin_project! {
87 #[derive(Debug)]
92 pub struct AwsChunkedBody<InnerBody> {
93 #[pin]
94 pub(super) inner: InnerBody,
95 #[pin]
96 pub(super) state: AwsChunkedBodyState,
97 pub(super) options: AwsChunkedBodyOptions,
98 pub(super) inner_body_bytes_read_so_far: usize,
99 #[pin]
100 pub(super) chunk_buffer: ChunkBuf,
101 #[pin]
102 pub(super) buffered_trailing_headers: Option<http_1x::HeaderMap>,
103 #[pin]
104 pub(super) signer: Option<std::panic::AssertUnwindSafe<Box<dyn SignChunk + Send + Sync>>>,
105 }
106}
107
108impl<Inner> AwsChunkedBody<Inner> {
109 pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
111 Self {
112 inner: body,
113 state: AwsChunkedBodyState::WritingChunk,
114 options,
115 inner_body_bytes_read_so_far: 0,
116 chunk_buffer: ChunkBuf::Empty,
117 buffered_trailing_headers: None,
118 signer: None,
119 }
120 }
121
122 #[allow(private_bounds)] pub fn with_signer<S>(mut self, signer: S) -> Self
125 where
126 S: SignChunk + Send + Sync + 'static,
127 {
128 self.signer = Some(std::panic::AssertUnwindSafe(Box::new(signer)));
129 self
130 }
131
132 pub(super) fn buffer_next_chunk(
138 inner: Pin<&mut Inner>,
139 mut chunk_buffer: Pin<&mut ChunkBuf>,
140 mut buffered_trailing_headers: Pin<&mut Option<http_1x::HeaderMap>>,
141 cx: &mut Context<'_>,
142 ) -> Poll<Result<bool, aws_smithy_types::body::Error>>
143 where
144 Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
145 {
146 match inner.poll_frame(cx) {
147 Poll::Ready(Some(Ok(frame))) => {
148 if frame.is_data() {
149 let data = frame.into_data().expect("just checked to be data");
150 match chunk_buffer.as_mut().get_mut() {
151 ChunkBuf::Empty => {
152 let mut buf = SegmentedBuf::new();
153 buf.push(data);
154 *chunk_buffer.as_mut().get_mut() = ChunkBuf::Partial(buf);
155 }
156 ChunkBuf::Partial(buf) => buf.push(data),
157 ChunkBuf::EosPartial(_) | ChunkBuf::Terminated => {
158 panic!("cannot buffer more data after the stream has ended or been terminated; this is a bug")
159 }
160 }
161 Poll::Ready(Ok(true))
162 } else {
163 let buf = chunk_buffer.as_mut().get_mut();
164 *buf = std::mem::replace(buf, ChunkBuf::Empty).ended();
165 *buffered_trailing_headers.as_mut().get_mut() = frame.into_trailers().ok();
166 Poll::Ready(Ok(false))
167 }
168 }
169 Poll::Ready(Some(Err(e))) => {
170 *chunk_buffer.as_mut().get_mut() = ChunkBuf::Terminated;
171 Poll::Ready(Err(e))
172 }
173 Poll::Ready(None) => Poll::Ready(Ok(false)),
174 Poll::Pending => Poll::Pending,
175 }
176 }
177}
178
179#[derive(Debug)]
180pub(super) enum AwsChunkedBodyError {
181 ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
186 StreamLengthMismatch { actual: u64, expected: u64 },
190 FailedToSign { source: SigningError },
192}
193
194impl std::fmt::Display for AwsChunkedBodyError {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 match self {
197 Self::ReportedTrailerLengthMismatch { actual, expected } => {
198 write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
199 }
200 Self::StreamLengthMismatch { actual, expected } => {
201 write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
202 }
203 Self::FailedToSign { source } => {
204 write!(f, "Signing error during aws-chunked encoding: {source}")
205 }
206 }
207 }
208}
209
210impl std::error::Error for AwsChunkedBodyError {}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_aws_chunked_body_is_unwind_safe_and_ref_unwind_safe() {
218 fn assert_unwind_safe<T: std::panic::UnwindSafe>() {}
219 fn assert_ref_unwind_safe<T: std::panic::RefUnwindSafe>() {}
220
221 assert_unwind_safe::<AwsChunkedBody<()>>();
222 assert_ref_unwind_safe::<AwsChunkedBody<()>>();
223 }
224}