Skip to main content

aws_runtime/content_encoding/body/
http_body_1_x.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use bytes::{Buf, Bytes, BytesMut};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::content_encoding::body::{AwsChunkedBody, AwsChunkedBodyError, AwsChunkedBodyState};
11use crate::content_encoding::{
12    header, SignChunk, CHUNK_SIGNATURE_BEGIN_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW, TRAILER_SEPARATOR,
13};
14use aws_sigv4::http_request::SigningError;
15use aws_smithy_runtime_api::http::Headers;
16
17macro_rules! signer_mut {
18    ($this:expr) => {
19        $this
20            .signer
21            .as_mut()
22            .get_mut()
23            .as_mut()
24            .expect("signer must be set")
25            .0
26            .as_mut()
27    };
28}
29
30impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
31where
32    Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
33{
34    type Data = Bytes;
35    type Error = aws_smithy_types::body::Error;
36
37    fn is_end_stream(&self) -> bool {
38        self.state == AwsChunkedBodyState::Closed
39    }
40
41    fn size_hint(&self) -> http_body_1x::SizeHint {
42        http_body_1x::SizeHint::with_exact(self.options.encoded_length())
43    }
44
45    fn poll_frame(
46        self: Pin<&mut Self>,
47        cx: &mut Context<'_>,
48    ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
49        tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
50        let mut this = self.project();
51        let chunk_size = this.options.chunk_size();
52
53        use AwsChunkedBodyState::*;
54        match *this.state {
55            WritingChunk => {
56                while !this.chunk_buffer.is_eos() {
57                    if this.chunk_buffer.remaining() >= chunk_size {
58                        let buf = this.chunk_buffer.buffered();
59                        let chunk_bytes = buf.copy_to_bytes(chunk_size);
60                        let chunk = if this.options.is_signed {
61                            let signer = signer_mut!(this);
62                            signed_encoded_chunk(signer, chunk_bytes).map_err(|e| {
63                                Box::new(AwsChunkedBodyError::FailedToSign { source: e })
64                            })?
65                        } else {
66                            unsigned_encoded_chunk(chunk_bytes)
67                        };
68                        *this.inner_body_bytes_read_so_far += chunk_size;
69                        tracing::trace!("writing chunk data: {:#?}", chunk);
70                        return Poll::Ready(Some(Ok(http_body_1x::Frame::data(chunk))));
71                    }
72
73                    match Self::buffer_next_chunk(
74                        this.inner.as_mut(),
75                        this.chunk_buffer.as_mut(),
76                        this.buffered_trailing_headers.as_mut(),
77                        cx,
78                    ) {
79                        Poll::Ready(Ok(true)) => continue,
80                        Poll::Ready(Ok(false)) => break,
81                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
82                        Poll::Pending => return Poll::Pending,
83                    }
84                }
85
86                if this.chunk_buffer.remaining() > 0 {
87                    let bytes_len_to_read =
88                        std::cmp::min(this.chunk_buffer.remaining(), chunk_size);
89                    let buf = this.chunk_buffer.buffered();
90                    let chunk_bytes = buf.copy_to_bytes(bytes_len_to_read);
91                    let chunk = if this.options.is_signed {
92                        let signer = signer_mut!(this);
93                        signed_encoded_chunk(signer, chunk_bytes).map_err(|e| {
94                            Box::new(AwsChunkedBodyError::FailedToSign { source: e })
95                        })?
96                    } else {
97                        unsigned_encoded_chunk(chunk_bytes)
98                    };
99                    *this.inner_body_bytes_read_so_far += bytes_len_to_read;
100                    tracing::trace!("remaining chunk data: {:#?}", chunk);
101                    return Poll::Ready(Some(Ok(http_body_1x::Frame::data(chunk))));
102                }
103
104                debug_assert!(this.chunk_buffer.remaining() == 0);
105
106                // We exhausted the body data, now check if the length is correct
107                if let Err(poll_stream_len_err) = check_for_stream_length_mismatch(
108                    *this.inner_body_bytes_read_so_far as u64,
109                    this.options.stream_length,
110                ) {
111                    return poll_stream_len_err;
112                }
113
114                if this.options.is_signed {
115                    *this.state = WritingZeroSizedSignedChunk;
116                } else {
117                    *this.state = PollingTrailers;
118                }
119                // Inner future has already returned `Ready` - no active waker.
120                // Wake explicitly to ensure the task gets polled again.
121                cx.waker().wake_by_ref();
122                Poll::Pending
123            }
124            WritingZeroSizedSignedChunk => {
125                let signer = signer_mut!(this);
126                let zero_sized_chunk = signed_encoded_chunk(signer, Bytes::new())
127                    .map_err(|e| Box::new(AwsChunkedBodyError::FailedToSign { source: e }))?;
128                if this.buffered_trailing_headers.is_some() {
129                    *this.state = PollingTrailers;
130                    let mut zero_sized_chunk = BytesMut::from(&zero_sized_chunk[..]);
131                    debug_assert!(zero_sized_chunk.ends_with(b"\r\n\r\n"));
132                    // For trailing checksum, we do not want the second CRLF as the checksum is appended in-between two CRLFs
133                    zero_sized_chunk.truncate(zero_sized_chunk.len() - 2);
134                    let zero_sized_chunk = zero_sized_chunk.freeze();
135                    tracing::trace!("writing zero sized signed chunk: {:#?}", zero_sized_chunk);
136                    Poll::Ready(Some(Ok(http_body_1x::Frame::data(zero_sized_chunk))))
137                } else {
138                    *this.state = Closed;
139                    tracing::trace!(
140                        "writing zero sized signed chunk without trailer: {:#?}",
141                        zero_sized_chunk
142                    );
143                    Poll::Ready(Some(Ok(http_body_1x::Frame::data(zero_sized_chunk))))
144                }
145            }
146            PollingTrailers => match this.inner.as_mut().poll_frame(cx) {
147                Poll::Ready(Some(Ok(frame))) => {
148                    let trailers = frame.into_trailers().ok();
149                    if let Some(trailers) = trailers {
150                        match this.buffered_trailing_headers.as_mut().get_mut() {
151                            Some(existing) => existing.extend(trailers),
152                            None => {
153                                *this.buffered_trailing_headers.as_mut().get_mut() = Some(trailers)
154                            }
155                        }
156                    }
157                    // Inner future has already returned `Ready` - no active waker.
158                    // Wake explicitly to ensure the task gets polled again.
159                    cx.waker().wake_by_ref();
160                    Poll::Pending
161                }
162                Poll::Ready(Some(Err(err))) => {
163                    tracing::error!(error = ?err, "error polling inner");
164                    Poll::Ready(Some(Err(err)))
165                }
166                Poll::Ready(None) => {
167                    *this.state = WritingTrailers;
168                    // Inner future has already returned `Ready` - no active waker.
169                    // Wake explicitly to ensure the task gets polled again.
170                    cx.waker().wake_by_ref();
171                    Poll::Pending
172                }
173                Poll::Pending => Poll::Pending,
174            },
175            WritingTrailers => {
176                let mut final_chunk = if this.options.is_signed {
177                    BytesMut::new()
178                } else {
179                    BytesMut::from(CHUNK_TERMINATOR_RAW)
180                };
181
182                let trailer_bytes = if let Some(mut trailer) = this.buffered_trailing_headers.take()
183                {
184                    let mut trailer_bytes = BytesMut::new();
185                    let trailer = if this.options.is_signed && !trailer.is_empty() {
186                        let signer = signer_mut!(this);
187                        let signature = signer
188                            .trailer_signature(&Headers::try_from(trailer.clone())?)
189                            .map_err(|e| {
190                                Box::new(AwsChunkedBodyError::FailedToSign { source: e })
191                            })?;
192                        trailer.insert(
193                            http_1x::header::HeaderName::from_static(
194                                header::X_AMZ_TRAILER_SIGNATURE,
195                            ),
196                            http_1x::header::HeaderValue::from_str(&signature).unwrap(),
197                        );
198                        trailer
199                    } else {
200                        trailer
201                    };
202
203                    let actual_length: u64 = total_rendered_length_of_trailers(Some(&trailer));
204                    let expected_length = this.options.total_trailer_length();
205                    if expected_length != actual_length {
206                        let err = AwsChunkedBodyError::ReportedTrailerLengthMismatch {
207                            actual: actual_length,
208                            expected: expected_length,
209                        };
210                        return Poll::Ready(Some(Err(err.into())));
211                    }
212
213                    trailer_bytes = trailers_as_aws_chunked_bytes(Some(&trailer), trailer_bytes);
214                    trailer_bytes.freeze()
215                } else {
216                    Bytes::new()
217                };
218
219                *this.state = Closed;
220
221                if final_chunk.is_empty() && trailer_bytes.is_empty() {
222                    // Case for signed aws-chunked encoding with no trailers
223                    return Poll::Ready(None);
224                }
225
226                final_chunk.extend_from_slice(&trailer_bytes);
227                final_chunk.extend_from_slice(CRLF_RAW);
228
229                tracing::trace!("final chunk: {:#?}", final_chunk);
230                Poll::Ready(Some(Ok(http_body_1x::Frame::data(final_chunk.freeze()))))
231            }
232            Closed => Poll::Ready(None),
233            #[allow(unreachable_patterns)]
234            // needed when cargo feature `http-02x` is enabled, bringing in an unused enum `WritingChunkData`
235            ref otherwise => {
236                unreachable!(
237                    "invalid state {otherwise:?} for `poll_frame` in http-1x; this is a bug"
238                )
239            }
240        }
241    }
242}
243
244fn signed_encoded_chunk(
245    signer: &mut (dyn SignChunk + Send + Sync),
246    chunk_bytes: Bytes,
247) -> Result<Bytes, SigningError> {
248    let chunk_size = format!("{:X}", chunk_bytes.len());
249    let mut chunk = BytesMut::new();
250    chunk.extend_from_slice(chunk_size.as_bytes());
251    chunk.extend_from_slice(CHUNK_SIGNATURE_BEGIN_RAW);
252    chunk.extend_from_slice(signer.chunk_signature(&chunk_bytes)?.as_bytes());
253    chunk.extend_from_slice(CRLF_RAW);
254    chunk.extend_from_slice(&chunk_bytes);
255    chunk.extend_from_slice(CRLF_RAW);
256    Ok(chunk.freeze())
257}
258
259fn unsigned_encoded_chunk(chunk_bytes: Bytes) -> Bytes {
260    let chunk_size = format!("{:X}", chunk_bytes.len());
261    let mut chunk = BytesMut::new();
262    chunk.extend_from_slice(chunk_size.as_bytes());
263    chunk.extend_from_slice(CRLF_RAW);
264    chunk.extend_from_slice(&chunk_bytes);
265    chunk.extend_from_slice(CRLF_RAW);
266    chunk.freeze()
267}
268
269/// Writes trailers out into a byte array `buffer`.
270///
271/// - Trailer names are separated by a single colon only, no space.
272/// - Trailer names with multiple values will be written out one line per value, with the name
273///   appearing on each line.
274fn trailers_as_aws_chunked_bytes(
275    trailer_map: Option<&http_1x::HeaderMap>,
276    mut buffer: BytesMut,
277) -> BytesMut {
278    if let Some(trailer_map) = trailer_map {
279        let mut current_header_name: Option<http_1x::header::HeaderName> = None;
280
281        for (header_name, header_value) in trailer_map.clone().into_iter() {
282            // When a header has multiple values, the name only comes up in iteration the first time
283            // we see it. Therefore, we need to keep track of the last name we saw and fall back to
284            // it when `header_name == None`.
285            current_header_name = header_name.or(current_header_name);
286
287            // In practice, this will always exist, but `if let` is nicer than unwrap
288            if let Some(header_name) = current_header_name.as_ref() {
289                buffer.extend_from_slice(header_name.as_ref());
290                buffer.extend_from_slice(TRAILER_SEPARATOR);
291                buffer.extend_from_slice(header_value.as_bytes());
292                buffer.extend_from_slice(CRLF_RAW);
293            }
294        }
295
296        buffer
297    } else {
298        buffer
299    }
300}
301
302/// Given an optional `HeaderMap`, calculate the total number of bytes required to represent the
303/// `HeaderMap`. If no `HeaderMap` is given as input, return 0.
304///
305/// - Trailer names are separated by a single colon only, no space.
306/// - Trailer names with multiple values will be written out one line per value, with the name
307///   appearing on each line.
308fn total_rendered_length_of_trailers(trailer_map: Option<&http_1x::HeaderMap>) -> u64 {
309    match trailer_map {
310        Some(trailer_map) => trailer_map
311            .iter()
312            .map(|(trailer_name, trailer_value)| {
313                trailer_name.as_str().len()
314                    + TRAILER_SEPARATOR.len()
315                    + trailer_value.len()
316                    + CRLF_RAW.len()
317            })
318            .sum::<usize>() as u64,
319        None => 0,
320    }
321}
322
323/// This is an ugly return type, but in practice it just returns `Ok(())` if the values match
324/// and `Err(Poll::Ready(Some(Err(AwsChunkedBodyError::StreamLengthMismatch))))` if they don't
325#[allow(clippy::type_complexity)]
326fn check_for_stream_length_mismatch(
327    actual_stream_length: u64,
328    expected_stream_length: u64,
329) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>> {
330    if actual_stream_length != expected_stream_length {
331        let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
332            actual: actual_stream_length,
333            expected: expected_stream_length,
334        });
335        return Err(Poll::Ready(Some(Err(err))));
336    };
337
338    Ok(())
339}
340
341#[cfg(test)]
342mod tests {
343    use super::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes};
344    use crate::content_encoding::{
345        AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
346        DEFAULT_CHUNK_SIZE_BYTE,
347    };
348
349    use aws_smithy_types::body::SdkBody;
350    use bytes::{Buf, Bytes, BytesMut};
351    use bytes_utils::SegmentedBuf;
352    use http_1x::{HeaderMap, HeaderValue};
353    use http_body_1x::{Body, Frame, SizeHint};
354    use http_body_util::BodyExt;
355    use pin_project_lite::pin_project;
356
357    use std::io::Read;
358    use std::pin::Pin;
359    use std::task::{Context, Poll};
360    use std::time::Duration;
361
362    pin_project! {
363        struct SputteringBody {
364            parts: Vec<Option<Bytes>>,
365            cursor: usize,
366            delay_in_millis: u64,
367        }
368    }
369
370    impl SputteringBody {
371        fn len(&self) -> usize {
372            self.parts.iter().flatten().map(|b| b.len()).sum()
373        }
374    }
375
376    impl Body for SputteringBody {
377        type Data = Bytes;
378        type Error = aws_smithy_types::body::Error;
379
380        fn poll_frame(
381            self: Pin<&mut Self>,
382            cx: &mut Context<'_>,
383        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
384            if self.cursor == self.parts.len() {
385                return Poll::Ready(None);
386            }
387
388            let this = self.project();
389            let delay_in_millis = *this.delay_in_millis;
390            let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
391
392            match next_part {
393                None => {
394                    *this.cursor += 1;
395                    let waker = cx.waker().clone();
396                    tokio::spawn(async move {
397                        tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
398                        waker.wake();
399                    });
400                    Poll::Pending
401                }
402                Some(data) => {
403                    *this.cursor += 1;
404                    let frame = Frame::data(data);
405                    Poll::Ready(Some(Ok(frame)))
406                }
407            }
408        }
409
410        fn is_end_stream(&self) -> bool {
411            false
412        }
413
414        fn size_hint(&self) -> SizeHint {
415            SizeHint::new()
416        }
417    }
418
419    // Custom body that returns data and trailers
420    pin_project! {
421        struct TestBodyWithTrailers {
422            data: Option<Bytes>,
423            trailers: Option<HeaderMap>,
424        }
425    }
426
427    impl Body for TestBodyWithTrailers {
428        type Data = Bytes;
429        type Error = aws_smithy_types::body::Error;
430
431        fn poll_frame(
432            self: Pin<&mut Self>,
433            _cx: &mut Context<'_>,
434        ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
435            let this = self.project();
436
437            if let Some(data) = this.data.take() {
438                return Poll::Ready(Some(Ok(http_body_1x::Frame::data(data))));
439            }
440
441            if let Some(trailers) = this.trailers.take() {
442                return Poll::Ready(Some(Ok(http_body_1x::Frame::trailers(trailers))));
443            }
444
445            Poll::Ready(None)
446        }
447    }
448
449    #[tokio::test]
450    async fn test_aws_chunked_encoding() {
451        let test_fut = async {
452            let input_str = "Hello world";
453            let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
454            let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
455
456            let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
457            while let Some(Ok(buf)) = body.frame().await {
458                output.push(buf.into_data().unwrap());
459            }
460
461            let mut actual_output = String::new();
462            output
463                .reader()
464                .read_to_string(&mut actual_output)
465                .expect("Doesn't cause IO errors");
466
467            let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
468
469            assert_eq!(expected_output, actual_output);
470
471            // You can insert a `tokio::time::sleep` here to verify the timeout works as intended
472        };
473
474        let timeout_duration = Duration::from_secs(3);
475        if tokio::time::timeout(timeout_duration, test_fut)
476            .await
477            .is_err()
478        {
479            panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
480        }
481    }
482
483    #[tokio::test]
484    async fn test_aws_chunked_encoding_sputtering_body() {
485        let test_fut = async {
486            let input = SputteringBody {
487                parts: vec![
488                    Some(Bytes::from_static(b"chunk 1, ")),
489                    None,
490                    Some(Bytes::from_static(b"chunk 2, ")),
491                    Some(Bytes::from_static(b"chunk 3, ")),
492                    None,
493                    None,
494                    Some(Bytes::from_static(b"chunk 4, ")),
495                    Some(Bytes::from_static(b"chunk 5, ")),
496                    Some(Bytes::from_static(b"chunk 6")),
497                ],
498                cursor: 0,
499                delay_in_millis: 500,
500            };
501            let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
502            let mut body = AwsChunkedBody::new(input, opts);
503
504            let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
505            while let Some(Ok(buf)) = body.frame().await {
506                output.push(buf.into_data().unwrap());
507            }
508
509            let mut actual_output = String::new();
510            output
511                .reader()
512                .read_to_string(&mut actual_output)
513                .expect("Doesn't cause IO errors");
514
515            let expected_output =
516                "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
517
518            assert_eq!(expected_output, actual_output);
519        };
520
521        let timeout_duration = Duration::from_secs(3);
522        if tokio::time::timeout(timeout_duration, test_fut)
523            .await
524            .is_err()
525        {
526            panic!(
527                "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
528            );
529        }
530    }
531
532    #[tokio::test]
533    async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
534        let input_str = "Hello world";
535        // Test body has no trailers, so this length is incorrect and will trigger an assert panic
536        // When the panic occurs, it will actually expect a length of 44. This is because, when using
537        // aws-chunked encoding, each trailer will end with a CRLF which is 2 bytes long.
538        let wrong_trailer_len = 42;
539        let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
540        let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
541
542        // We don't care about the body contents but we have to read it all before checking for trailers
543        while let Some(Ok(frame)) = body.frame().await {
544            assert!(!frame.is_trailers());
545        }
546    }
547
548    #[tokio::test]
549    async fn test_aws_chunked_encoding_empty_body() {
550        let input_str = "";
551        let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
552        let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
553
554        let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
555        while let Some(Ok(frame)) = body.frame().await {
556            output.push(frame.into_data().unwrap());
557        }
558
559        let mut actual_output = String::new();
560        output
561            .reader()
562            .read_to_string(&mut actual_output)
563            .expect("Doesn't cause IO errors");
564
565        let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
566        let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
567        let expected_output = std::str::from_utf8(&expected_output).unwrap();
568
569        assert_eq!(expected_output, actual_output);
570    }
571
572    #[tokio::test]
573    async fn test_total_rendered_length_of_trailers() {
574        let mut trailers = HeaderMap::new();
575
576        trailers.insert("empty_value", HeaderValue::from_static(""));
577
578        trailers.insert("single_value", HeaderValue::from_static("value 1"));
579
580        trailers.insert("two_values", HeaderValue::from_static("value 1"));
581        trailers.append("two_values", HeaderValue::from_static("value 2"));
582
583        trailers.insert("three_values", HeaderValue::from_static("value 1"));
584        trailers.append("three_values", HeaderValue::from_static("value 2"));
585        trailers.append("three_values", HeaderValue::from_static("value 3"));
586
587        let trailers = Some(&trailers);
588        let actual_length = total_rendered_length_of_trailers(trailers);
589        let buf = BytesMut::with_capacity(actual_length as usize);
590        let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
591
592        assert_eq!(expected_length, actual_length);
593    }
594
595    #[tokio::test]
596    async fn test_total_rendered_length_of_empty_trailers() {
597        let header_map = HeaderMap::new();
598        let trailers = Some(&header_map);
599        let actual_length = total_rendered_length_of_trailers(trailers);
600        let buf = BytesMut::with_capacity(actual_length as usize);
601        let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
602
603        assert_eq!(expected_length, actual_length);
604    }
605
606    #[tokio::test]
607    async fn test_poll_frame_with_default_chunk_size() {
608        let test_data = Bytes::from("1234567890123456789012345");
609        let body = SdkBody::from(test_data.clone());
610        let options = AwsChunkedBodyOptions::new(test_data.len() as u64, vec![]);
611        let mut chunked_body = AwsChunkedBody::new(body, options);
612
613        let mut data_frames: Vec<Bytes> = Vec::new();
614        while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
615            if let Ok(data) = frame.into_data() {
616                data_frames.push(data);
617            }
618        }
619
620        assert_eq!(data_frames.len(), 2); // Data fits in one chunk, plus the final chunk
621        assert_eq!(
622            Bytes::from_static(b"19\r\n1234567890123456789012345\r\n"),
623            data_frames[0]
624        );
625        assert_eq!(Bytes::from_static(b"0\r\n\r\n"), data_frames[1]);
626    }
627
628    #[tokio::test]
629    async fn test_poll_frame_with_custom_chunk_size() {
630        let test_data = Bytes::from("1234567890123456789012345");
631        let body = SdkBody::from(test_data.clone());
632        let options =
633            AwsChunkedBodyOptions::new(test_data.len() as u64, vec![]).with_chunk_size(10);
634        let mut chunked_body = AwsChunkedBody::new(body, options);
635
636        let mut data_frames: Vec<Bytes> = Vec::new();
637        while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
638            if let Ok(data) = frame.into_data() {
639                data_frames.push(data);
640            }
641        }
642
643        assert_eq!(4, data_frames.len()); // 25 bytes / 10 = 2.5 so 3 chunks, plus the final chunk
644        assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[0]);
645        assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[1]);
646        assert_eq!(Bytes::from_static(b"5\r\n12345\r\n"), data_frames[2]);
647        assert_eq!(Bytes::from_static(b"0\r\n\r\n"), data_frames[3]);
648    }
649
650    #[tokio::test]
651    async fn test_poll_frame_with_trailers() {
652        let data = Bytes::from("1234567890123456789012345");
653        let stream_len = data.len() as u64;
654        let mut trailers = HeaderMap::new();
655        trailers.insert("x-amz-checksum-crc32", HeaderValue::from_static("78DeVw=="));
656        let body = TestBodyWithTrailers {
657            data: Some(data),
658            trailers: Some(trailers),
659        };
660        let options = AwsChunkedBodyOptions::new(stream_len, vec![29]).with_chunk_size(10);
661        let mut chunked_body = AwsChunkedBody::new(body, options);
662
663        let mut data_frames: Vec<Bytes> = Vec::new();
664        while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
665            if let Ok(data) = frame.into_data() {
666                data_frames.push(data);
667            }
668        }
669
670        assert_eq!(4, data_frames.len()); // 25 bytes / 10 = 2.5 so 3 chunks, plus the final chunk
671        assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[0]);
672        assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[1]);
673        assert_eq!(Bytes::from_static(b"5\r\n12345\r\n"), data_frames[2]);
674        assert_eq!(
675            Bytes::from_static(b"0\r\nx-amz-checksum-crc32:78DeVw==\r\n\r\n"),
676            data_frames[3]
677        );
678    }
679
680    // Testing scenario derived from https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
681    #[tokio::test]
682    async fn test_aws_chunked_body_poll_frame_with_signer() {
683        use crate::auth::sigv4::SigV4MessageSigner;
684        use aws_credential_types::Credentials;
685        use aws_sigv4::http_request::SigningSettings;
686        use aws_smithy_async::time::{SharedTimeSource, StaticTimeSource};
687        use aws_types::region::SigningRegion;
688        use aws_types::SigningName;
689        use std::time::{Duration, UNIX_EPOCH};
690
691        // 65KB of 'a' characters
692        let data = "a".repeat(65 * 1024);
693        let stream_len = data.len() as u64;
694        let inner_body = SdkBody::from(data);
695
696        // `StaticTimeSource` for 20130524T000000Z
697        let time = StaticTimeSource::new(UNIX_EPOCH + Duration::from_secs(1369353600));
698        let shared_time = SharedTimeSource::from(time);
699
700        let credentials = Credentials::new(
701            "AKIAIOSFODNN7EXAMPLE",
702            "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
703            None,
704            None,
705            "test",
706        );
707
708        let seed_signature =
709            "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9".to_owned();
710        let signer = SigV4MessageSigner::new(
711            seed_signature,
712            credentials.into(),
713            SigningRegion::from_static("us-east-1"),
714            SigningName::from_static("s3"),
715            shared_time,
716            SigningSettings::default(),
717        );
718
719        let opt = AwsChunkedBodyOptions::new(stream_len, vec![]).signed_chunked_encoding(true);
720        let mut chunked_body = AwsChunkedBody::new(inner_body, opt).with_signer(signer);
721
722        let mut data_frames: Vec<Bytes> = Vec::new();
723        while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
724            if let Ok(data) = frame.into_data() {
725                data_frames.push(data);
726            }
727        }
728
729        assert_eq!(3, data_frames.len()); // 64 KB, 1 KB, and the final chunk with 0 bytes of chunk data.
730        assert!(data_frames[0].starts_with(b"10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n"));
731        assert!(data_frames[1].starts_with(b"400;chunk-signature=0055627c9e194cb4542bae2aa5492e3c1575bbb81b612b7d234b86a503ef5497\r\n"));
732        assert_eq!(data_frames[2], Bytes::from_static(b"0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n"));
733    }
734
735    // Testing scenario derived from https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming-trailers.html
736    #[tokio::test]
737    async fn test_aws_chunked_body_poll_frame_with_signer_and_trailers() {
738        use crate::auth::sigv4::SigV4MessageSigner;
739        use aws_credential_types::Credentials;
740        use aws_sigv4::http_request::SigningSettings;
741        use aws_smithy_async::time::{SharedTimeSource, StaticTimeSource};
742        use aws_types::region::SigningRegion;
743        use aws_types::SigningName;
744        use std::time::{Duration, UNIX_EPOCH};
745
746        // 65KB of 'a' characters
747        let data = "a".repeat(65 * 1024);
748        let stream_len = data.len() as u64;
749
750        // Set trailers with x-amz-checksum-crc32c header
751        let mut trailers = HeaderMap::new();
752        trailers.insert(
753            "x-amz-checksum-crc32c",
754            HeaderValue::from_static("sOO8/Q=="),
755        );
756
757        let inner_body = TestBodyWithTrailers {
758            data: Some(Bytes::from(data)),
759            trailers: Some(trailers),
760        };
761
762        // `StaticTimeSource` for 20130524T000000Z
763        let time = StaticTimeSource::new(UNIX_EPOCH + Duration::from_secs(1369353600));
764        let shared_time = SharedTimeSource::from(time);
765
766        let credentials = Credentials::new(
767            "AKIAIOSFODNN7EXAMPLE",
768            "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
769            None,
770            None,
771            "test",
772        );
773
774        let seed_signature =
775            "106e2a8a18243abcf37539882f36619c00e2dfc72633413f02d3b74544bfeb8e".to_owned();
776        let signer = SigV4MessageSigner::new(
777            seed_signature,
778            credentials.into(),
779            SigningRegion::from_static("us-east-1"),
780            SigningName::from_static("s3"),
781            shared_time,
782            SigningSettings::default(),
783        );
784
785        let opt =
786            AwsChunkedBodyOptions::new(stream_len, vec![30, 88]).signed_chunked_encoding(true);
787        let mut chunked_body = AwsChunkedBody::new(inner_body, opt).with_signer(signer);
788
789        let mut data_frames: Vec<Bytes> = Vec::new();
790        while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
791            if let Ok(data) = frame.into_data() {
792                data_frames.push(data);
793            }
794        }
795
796        assert_eq!(4, data_frames.len()); // 64 KB, 1 KB, 0 bytes of chunk data, and the trailer chunk.
797        assert!(data_frames[0].starts_with(b"10000;chunk-signature=b474d8862b1487a5145d686f57f013e54db672cee1c953b3010fb58501ef5aa2\r\n"));
798        assert!(data_frames[1].starts_with(b"400;chunk-signature=1c1344b170168f8e65b41376b44b20fe354e373826ccbbe2c1d40a8cae51e5c7\r\n"));
799        assert_eq!(data_frames[2], Bytes::from_static(b"0;chunk-signature=2ca2aba2005185cf7159c6277faf83795951dd77a3a99e6e65d5c9f85863f992\r\n"));
800        assert_eq!(data_frames[3], Bytes::from_static(b"x-amz-checksum-crc32c:sOO8/Q==\r\nx-amz-trailer-signature:d81f82fc3505edab99d459891051a732e8730629a2e4a59689829ca17fe2e435\r\n\r\n"));
801    }
802
803    #[test]
804    fn test_unsigned_encoded_length_with_no_trailer() {
805        {
806            let options = AwsChunkedBodyOptions::new(10, vec![]);
807            /*
808             A\r\n
809             10 bytes of data\r\n
810             0\r\n
811             \r\n
812             -------------------------------------------------------------
813             1 (A) + 2 (\r\n) +
814             10 (data) + 2 (\r\n) +
815             1 (0) + 2 (\r\n) +
816             2 (\r\n)
817
818                = 20 total bytes
819            */
820            assert_eq!(options.encoded_length(), 20);
821        }
822        {
823            let options = AwsChunkedBodyOptions::new((DEFAULT_CHUNK_SIZE_BYTE + 10) as u64, vec![]);
824            /*
825             10000\r\n
826             65536 bytes of data\r\n
827             A\r\n
828             10 bytes of data\r\n
829             0\r\n
830             \r\n
831             -------------------------------------------------------------
832             5 (10000) + 2 (\r\n) +
833             65536 (data) + 2 (\r\n) +
834             1 (A) + 2 (\r\n) +
835             10 (data) + 2 (\r\n) +
836             1 (0) + 2 (\r\n) +
837             2 (\r\n)
838
839                = 65565 total bytes
840            */
841            assert_eq!(options.encoded_length(), 65565);
842        }
843    }
844
845    #[test]
846    fn test_unsigned_encoded_length_with_trailer() {
847        let options = AwsChunkedBodyOptions::new(10, vec![30]);
848        /*
849            A\r\n
850            10 bytes of data\r\n
851            0\r\n
852            x-amz-checksum-crc32c:sOO8/Q==\r\n
853            \r\n
854            -------------------------------------------------------------
855            1 (A) + 2 (\r\n) +
856            10 (data) + 2 (\r\n) +
857            1 (0) + 2 (\r\n) +
858            21 (x-amz-checksum-crc32c) + 1 (:) + 8 (sOO8/Q==) + 2 (\r\n) +
859            2 (\r\n)
860
861                = 52 total bytes
862        */
863        assert_eq!(options.encoded_length(), 52);
864    }
865
866    #[test]
867    fn test_signed_encoded_length_with_no_trailer() {
868        {
869            let options = AwsChunkedBodyOptions::new(10, vec![]).signed_chunked_encoding(true);
870            /*
871             A;chunk-signature=<signature>\r\n
872             10 bytes of data\r\n
873             0;chunk-signature=<signature>\r\n
874             \r\n
875             -------------------------------------------------------------
876             1 (A) + 17 (;chunk-signature=) + 64 (signature) + 2 (\r\n) +
877             10 (data) + 2 (\r\n) +
878             1 (0) + 17 (;chunk-signature) + 64 (signature) + 2 (\r\n) +
879             2 (\r\n)
880
881                = 182 total bytes
882            */
883            assert_eq!(options.encoded_length(), 182);
884        }
885        {
886            let options = AwsChunkedBodyOptions::new((DEFAULT_CHUNK_SIZE_BYTE + 10) as u64, vec![])
887                .signed_chunked_encoding(true);
888            /*
889             10000;chunk-signature=<signature>\r\n
890             65536 bytes of data\r\n
891             A;chunk-signature=<signature>\r\n
892             10 bytes of data\r\n
893             0;chunk-signature=<signature>\r\n
894             \r\n
895             -------------------------------------------------------------
896             5 (10000) + 17 (;chunk-signature=) + 64 (signature) + 2 (\r\n) +
897             65536 (data) + 2 (\r\n) +
898             1 (A) + 17 (;chunk-signature=) + 64 (signature) + 2 (\r\n) +
899             10 (data) + 2 (\r\n) +
900             1 (0) + 17 (;chunk-signature) + 64 (signature) + 2 (\r\n) +
901             2 (\r\n)
902
903                = 65808 total bytes
904            */
905            assert_eq!(options.encoded_length(), 65808);
906        }
907    }
908
909    #[test]
910    fn test_signed_encoded_length_with_trailer() {
911        let options = AwsChunkedBodyOptions::new(10, vec![30, 88]).signed_chunked_encoding(true);
912        /*
913            A;chunk-signature=<signature>\r\n
914            10 bytes of data\r\n
915            0;chunk-signature=<signature>\r\n
916            x-amz-checksum-crc32c:sOO8/Q==\r\n
917            x-amz-trailer-signature:<signature>\r\n
918            \r\n
919            -------------------------------------------------------------
920            1 (A) + 17 (;chunk-signature=) + 64 (signature) + 2 (\r\n) +
921            10 (data) + 2 (\r\n) +
922            1 (0) + 17 (;chunk-signature) + 64 (signature) + 2 (\r\n) +
923            21 (x-amz-checksum-crc32c) + 1 (:) + 8 (sOO8/Q==) + 2 (\r\n) +
924            23 (x-amz-trailer-signature) + 1 (:) + 64 (signature) + 2 (\r\n) +
925            2 (\r\n)
926
927                = 304 total bytes
928        */
929        assert_eq!(options.encoded_length(), 304);
930    }
931}