1use aws_smithy_types::config_bag::{Storable, StoreReplace};
7use bytes::{Bytes, BytesMut};
8use pin_project_lite::pin_project;
9
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13const CRLF: &str = "\r\n";
14const CRLF_RAW: &[u8] = b"\r\n";
15
16const CHUNK_TERMINATOR: &str = "0\r\n";
17const CHUNK_TERMINATOR_RAW: &[u8] = b"0\r\n";
18
19const TRAILER_SEPARATOR: &[u8] = b":";
20
21pub mod header_value {
23 pub const AWS_CHUNKED: &str = "aws-chunked";
25}
26
27#[derive(Clone, Debug, Default)]
29#[non_exhaustive]
30pub struct AwsChunkedBodyOptions {
31 stream_length: u64,
35 trailer_lengths: Vec<u64>,
38 disabled: bool,
41}
42
43impl Storable for AwsChunkedBodyOptions {
44 type Storer = StoreReplace<Self>;
45}
46
47impl AwsChunkedBodyOptions {
48 pub fn new(stream_length: u64, trailer_lengths: Vec<u64>) -> Self {
50 Self {
51 stream_length,
52 trailer_lengths,
53 disabled: false,
54 }
55 }
56
57 fn total_trailer_length(&self) -> u64 {
58 self.trailer_lengths.iter().sum::<u64>()
59 + (self.trailer_lengths.len() * CRLF.len()) as u64
61 }
62
63 pub fn with_stream_length(mut self, stream_length: u64) -> Self {
65 self.stream_length = stream_length;
66 self
67 }
68
69 pub fn with_trailer_len(mut self, trailer_len: u64) -> Self {
71 self.trailer_lengths.push(trailer_len);
72 self
73 }
74
75 pub fn disable_chunked_encoding() -> Self {
79 Self {
80 disabled: true,
81 ..Default::default()
82 }
83 }
84
85 pub fn disabled(&self) -> bool {
87 self.disabled
88 }
89
90 pub fn encoded_length(&self) -> u64 {
92 let mut length = 0;
93 if self.stream_length != 0 {
94 length += get_unsigned_chunk_bytes_length(self.stream_length);
95 }
96
97 length += CHUNK_TERMINATOR.len() as u64;
99
100 for len in self.trailer_lengths.iter() {
102 length += len + CRLF.len() as u64;
103 }
104
105 length += CRLF.len() as u64;
107
108 length
109 }
110}
111
112#[derive(Debug, PartialEq, Eq)]
113enum AwsChunkedBodyState {
114 WritingChunkSize,
117 WritingChunk,
121 WritingTrailers,
124 Closed,
126}
127
128pin_project! {
129 #[derive(Debug)]
150 pub struct AwsChunkedBody<InnerBody> {
151 #[pin]
152 inner: InnerBody,
153 #[pin]
154 state: AwsChunkedBodyState,
155 options: AwsChunkedBodyOptions,
156 inner_body_bytes_read_so_far: usize,
157 }
158}
159
160impl<Inner> AwsChunkedBody<Inner> {
161 pub fn new(body: Inner, options: AwsChunkedBodyOptions) -> Self {
163 Self {
164 inner: body,
165 state: AwsChunkedBodyState::WritingChunkSize,
166 options,
167 inner_body_bytes_read_so_far: 0,
168 }
169 }
170}
171
172#[cfg(feature = "http-02x")]
173impl<Inner> http_body_04x::Body for AwsChunkedBody<Inner>
174where
175 Inner: http_body_04x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
176{
177 type Data = Bytes;
178 type Error = aws_smithy_types::body::Error;
179
180 fn poll_data(
181 self: Pin<&mut Self>,
182 cx: &mut Context<'_>,
183 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
184 tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
185 let mut this = self.project();
186
187 match *this.state {
188 AwsChunkedBodyState::WritingChunkSize => {
189 if this.options.stream_length == 0 {
190 *this.state = AwsChunkedBodyState::WritingTrailers;
192 tracing::trace!("stream is empty, writing chunk terminator");
193 Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
194 } else {
195 *this.state = AwsChunkedBodyState::WritingChunk;
196 let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
198 tracing::trace!(%chunk_size, "writing chunk size");
199 let chunk_size = Bytes::from(chunk_size);
200 Poll::Ready(Some(Ok(chunk_size)))
201 }
202 }
203 AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
204 Poll::Ready(Some(Ok(data))) => {
205 tracing::trace!(len = data.len(), "writing chunk data");
206 *this.inner_body_bytes_read_so_far += data.len();
207 Poll::Ready(Some(Ok(data)))
208 }
209 Poll::Ready(None) => {
210 let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
211 let expected_stream_length = this.options.stream_length;
212 if actual_stream_length != expected_stream_length {
213 let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
214 actual: actual_stream_length,
215 expected: expected_stream_length,
216 });
217 return Poll::Ready(Some(Err(err)));
218 };
219
220 tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
221 *this.state = AwsChunkedBodyState::WritingTrailers;
222 Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
225 }
226 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
227 Poll::Pending => Poll::Pending,
228 },
229 AwsChunkedBodyState::WritingTrailers => {
230 return match this.inner.poll_trailers(cx) {
231 Poll::Ready(Ok(trailers)) => {
232 *this.state = AwsChunkedBodyState::Closed;
233 let expected_length =
234 http_02x_utils::total_rendered_length_of_trailers(trailers.as_ref());
235 let actual_length = this.options.total_trailer_length();
236
237 if expected_length != actual_length {
238 let err =
239 Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
240 actual: actual_length,
241 expected: expected_length,
242 });
243 return Poll::Ready(Some(Err(err)));
244 }
245
246 let mut trailers = http_02x_utils::trailers_as_aws_chunked_bytes(
247 trailers,
248 actual_length + 1,
249 );
250 trailers.extend_from_slice(CRLF.as_bytes());
252
253 Poll::Ready(Some(Ok(trailers.into())))
254 }
255 Poll::Pending => Poll::Pending,
256 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
257 };
258 }
259 AwsChunkedBodyState::Closed => Poll::Ready(None),
260 }
261 }
262
263 fn poll_trailers(
264 self: Pin<&mut Self>,
265 _cx: &mut Context<'_>,
266 ) -> Poll<Result<Option<http_02x::HeaderMap<http_02x::HeaderValue>>, Self::Error>> {
267 Poll::Ready(Ok(None))
269 }
270
271 fn is_end_stream(&self) -> bool {
272 self.state == AwsChunkedBodyState::Closed
273 }
274
275 fn size_hint(&self) -> http_body_04x::SizeHint {
276 http_body_04x::SizeHint::with_exact(self.options.encoded_length())
277 }
278}
279
280#[cfg(feature = "http-02x")]
282mod http_02x_utils {
283 use super::{CRLF, TRAILER_SEPARATOR};
284 use bytes::BytesMut;
285 use http_02x::HeaderMap;
286
287 pub(super) fn trailers_as_aws_chunked_bytes(
294 trailer_map: Option<HeaderMap>,
295 estimated_length: u64,
296 ) -> BytesMut {
297 if let Some(trailer_map) = trailer_map {
298 let mut current_header_name = None;
299 let mut trailers =
300 BytesMut::with_capacity(estimated_length.try_into().unwrap_or_default());
301
302 for (header_name, header_value) in trailer_map.into_iter() {
303 current_header_name = header_name.or(current_header_name);
307
308 if let Some(header_name) = current_header_name.as_ref() {
310 trailers.extend_from_slice(header_name.as_ref());
311 trailers.extend_from_slice(TRAILER_SEPARATOR);
312 trailers.extend_from_slice(header_value.as_bytes());
313 trailers.extend_from_slice(CRLF.as_bytes());
314 }
315 }
316
317 trailers
318 } else {
319 BytesMut::new()
320 }
321 }
322
323 pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
330 match trailer_map {
331 Some(trailer_map) => trailer_map
332 .iter()
333 .map(|(trailer_name, trailer_value)| {
334 trailer_name.as_str().len()
335 + TRAILER_SEPARATOR.len()
336 + trailer_value.len()
337 + CRLF.len()
338 })
339 .sum::<usize>() as u64,
340 None => 0,
341 }
342 }
343}
344
345const UNREACHABLE_STATES: &str = "These states already short circuited";
346
347impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
349where
350 Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
351{
352 type Data = Bytes;
353 type Error = aws_smithy_types::body::Error;
354
355 fn is_end_stream(&self) -> bool {
356 self.state == AwsChunkedBodyState::Closed
357 }
358
359 fn size_hint(&self) -> http_body_1x::SizeHint {
360 http_body_1x::SizeHint::with_exact(self.options.encoded_length())
361 }
362
363 fn poll_frame(
364 self: Pin<&mut Self>,
365 cx: &mut Context<'_>,
366 ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
367 tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
368 let mut this = self.project();
369
370 if *this.state == AwsChunkedBodyState::WritingChunkSize {
374 if this.options.stream_length == 0 {
375 tracing::trace!("stream is empty, writing chunk terminator");
377 let frame = http_body_1x::Frame::data(Bytes::from(CHUNK_TERMINATOR));
378 *this.state = AwsChunkedBodyState::WritingTrailers;
379 return Poll::Ready(Some(Ok(frame)));
380 } else {
381 let chunk_size = format!(
383 "{:X?}{}",
384 this.options.stream_length,
385 std::str::from_utf8(CRLF_RAW).unwrap()
386 );
387 tracing::trace!(%chunk_size, "writing chunk size");
388 let chunk_size = http_body_1x::Frame::data(Bytes::from(chunk_size));
389 *this.state = AwsChunkedBodyState::WritingChunk;
390 return Poll::Ready(Some(Ok(chunk_size)));
391 }
392 }
393
394 if *this.state == AwsChunkedBodyState::Closed {
396 return Poll::Ready(None);
397 }
398
399 let maybe_frame = this.inner.poll_frame(cx);
401 tracing::trace!(poll_state = ?maybe_frame, "Polling InnerBody");
402
403 match maybe_frame {
404 Poll::Ready(Some(Ok(frame))) => match *this.state {
405 AwsChunkedBodyState::WritingChunk => {
409 if frame.is_data() {
410 let data = frame.data_ref().expect("Data frame has data");
411 tracing::trace!(len = data.len(), "Writing chunk data");
412 *this.inner_body_bytes_read_so_far += data.len();
413 Poll::Ready(Some(Ok(frame)))
414 } else {
415 tracing::trace!(
416 "No more chunk data, writing CRLF + CHUNK_TERMINATOR to end the data, and the first trailer frame"
417 );
418
419 if let Err(poll_stream_len_err) =
421 http_1x_utils::check_for_stream_length_mismatch(
422 *this.inner_body_bytes_read_so_far as u64,
423 this.options.stream_length,
424 )
425 {
426 return poll_stream_len_err;
427 }
428
429 *this.state = AwsChunkedBodyState::WritingTrailers;
430 let trailers = frame.trailers_ref();
431
432 let actual_length: u64 =
441 http_1x_utils::total_rendered_length_of_trailers(trailers);
442 let expected_length = this.options.total_trailer_length();
443 if expected_length != actual_length {
444 let err =
445 Box::new(AwsChunkedBodyError::ReportedTrailerLengthMismatch {
446 actual: actual_length,
447 expected: expected_length,
448 });
449 return Poll::Ready(Some(Err(err)));
450 }
451
452 let mut buf = BytesMut::with_capacity(actual_length as usize + 7);
457 buf.extend_from_slice(&[CRLF_RAW, CHUNK_TERMINATOR_RAW].concat());
459
460 let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
464 Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
465 }
466 }
467 AwsChunkedBodyState::WritingTrailers => {
468 let trailers = frame.trailers_ref();
469 let actual_length: u64 =
470 http_1x_utils::total_rendered_length_of_trailers(trailers);
471 let buf = BytesMut::with_capacity(actual_length as usize + 7);
472 let trailers = http_1x_utils::trailers_as_aws_chunked_bytes(trailers, buf);
473 Poll::Ready(Some(Ok(http_body_1x::Frame::data(trailers.into()))))
474 }
475 AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
476 unreachable!("{}", UNREACHABLE_STATES)
477 }
478 },
479 Poll::Ready(None) => {
481 let trailers = match *this.state {
482 AwsChunkedBodyState::WritingChunk => {
483 if let Err(poll_stream_len_err) =
485 http_1x_utils::check_for_stream_length_mismatch(
486 *this.inner_body_bytes_read_so_far as u64,
487 this.options.stream_length,
488 )
489 {
490 return poll_stream_len_err;
491 }
492
493 let mut trailers = BytesMut::with_capacity(7);
497 trailers.extend_from_slice(
498 &[CRLF_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW].concat(),
499 );
500 trailers
501 }
502 AwsChunkedBodyState::WritingTrailers => {
503 let mut trailers = BytesMut::with_capacity(2);
504 trailers.extend_from_slice(CRLF_RAW);
505 trailers
506 }
507 AwsChunkedBodyState::Closed | AwsChunkedBodyState::WritingChunkSize => {
508 unreachable!("{}", UNREACHABLE_STATES)
509 }
510 };
511
512 let frame = http_body_1x::Frame::data(trailers.into());
513 *this.state = AwsChunkedBodyState::Closed;
514 Poll::Ready(Some(Ok(frame)))
515 }
516 Poll::Pending => Poll::Pending,
518 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
519 }
520 }
521}
522mod http_1x_utils {
524 use std::task::Poll;
525
526 use super::{CRLF_RAW, TRAILER_SEPARATOR};
527 use bytes::{Bytes, BytesMut};
528 use http_1x::{HeaderMap, HeaderName};
529
530 pub(super) fn trailers_as_aws_chunked_bytes(
537 trailer_map: Option<&HeaderMap>,
538 mut buffer: BytesMut,
539 ) -> BytesMut {
540 if let Some(trailer_map) = trailer_map {
541 let mut current_header_name: Option<HeaderName> = None;
542
543 for (header_name, header_value) in trailer_map.clone().into_iter() {
544 current_header_name = header_name.or(current_header_name);
548
549 if let Some(header_name) = current_header_name.as_ref() {
551 buffer.extend_from_slice(header_name.as_ref());
552 buffer.extend_from_slice(TRAILER_SEPARATOR);
553 buffer.extend_from_slice(header_value.as_bytes());
554 buffer.extend_from_slice(CRLF_RAW);
555 }
556 }
557
558 buffer
559 } else {
560 buffer
561 }
562 }
563
564 pub(super) fn total_rendered_length_of_trailers(trailer_map: Option<&HeaderMap>) -> u64 {
571 match trailer_map {
572 Some(trailer_map) => trailer_map
573 .iter()
574 .map(|(trailer_name, trailer_value)| {
575 trailer_name.as_str().len()
576 + TRAILER_SEPARATOR.len()
577 + trailer_value.len()
578 + CRLF_RAW.len()
579 })
580 .sum::<usize>() as u64,
581 None => 0,
582 }
583 }
584
585 #[allow(clippy::type_complexity)]
588 pub(super) fn check_for_stream_length_mismatch(
589 actual_stream_length: u64,
590 expected_stream_length: u64,
591 ) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>>
592 {
593 if actual_stream_length != expected_stream_length {
594 let err = Box::new(super::AwsChunkedBodyError::StreamLengthMismatch {
595 actual: actual_stream_length,
596 expected: expected_stream_length,
597 });
598 return Err(Poll::Ready(Some(Err(err))));
599 };
600
601 Ok(())
602 }
603}
604
605#[derive(Debug)]
607enum AwsChunkedBodyError {
608 ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
613 StreamLengthMismatch { actual: u64, expected: u64 },
617}
618
619impl std::fmt::Display for AwsChunkedBodyError {
620 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621 match self {
622 Self::ReportedTrailerLengthMismatch { actual, expected } => {
623 write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
624 }
625 Self::StreamLengthMismatch { actual, expected } => {
626 write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
627 }
628 }
629 }
630}
631
632impl std::error::Error for AwsChunkedBodyError {}
633
634fn int_log16<T>(mut i: T) -> u64
636where
637 T: std::ops::DivAssign + PartialOrd + From<u8> + Copy,
638{
639 let mut len = 0;
640 let zero = T::from(0);
641 let sixteen = T::from(16);
642
643 while i > zero {
644 i /= sixteen;
645 len += 1;
646 }
647
648 len
649}
650
651fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
652 let hex_repr_len = int_log16(payload_length);
653 hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
654}
655
656#[cfg(test)]
657mod tests {
658
659 #[cfg(test)]
660 #[cfg(feature = "http-02x")]
661 mod http_02x_tests {
662 use super::super::{
663 http_02x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
664 AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
665 };
666
667 use aws_smithy_types::body::SdkBody;
668 use bytes::{Buf, Bytes};
669 use bytes_utils::SegmentedBuf;
670 use http_02x::{HeaderMap, HeaderValue};
671 use http_body_04x::{Body, SizeHint};
672 use pin_project_lite::pin_project;
673
674 use std::io::Read;
675 use std::pin::Pin;
676 use std::task::{Context, Poll};
677 use std::time::Duration;
678
679 pin_project! {
680 struct SputteringBody {
681 parts: Vec<Option<Bytes>>,
682 cursor: usize,
683 delay_in_millis: u64,
684 }
685 }
686
687 impl SputteringBody {
688 fn len(&self) -> usize {
689 self.parts.iter().flatten().map(|b| b.len()).sum()
690 }
691 }
692
693 impl Body for SputteringBody {
694 type Data = Bytes;
695 type Error = aws_smithy_types::body::Error;
696
697 fn poll_data(
698 self: Pin<&mut Self>,
699 cx: &mut Context<'_>,
700 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
701 if self.cursor == self.parts.len() {
702 return Poll::Ready(None);
703 }
704
705 let this = self.project();
706 let delay_in_millis = *this.delay_in_millis;
707 let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
708
709 match next_part {
710 None => {
711 *this.cursor += 1;
712 let waker = cx.waker().clone();
713 tokio::spawn(async move {
714 tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
715 waker.wake();
716 });
717 Poll::Pending
718 }
719 Some(data) => {
720 *this.cursor += 1;
721 Poll::Ready(Some(Ok(data)))
722 }
723 }
724 }
725
726 fn poll_trailers(
727 self: Pin<&mut Self>,
728 _cx: &mut Context<'_>,
729 ) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
730 Poll::Ready(Ok(None))
731 }
732
733 fn is_end_stream(&self) -> bool {
734 false
735 }
736
737 fn size_hint(&self) -> SizeHint {
738 SizeHint::new()
739 }
740 }
741
742 #[tokio::test]
743 async fn test_aws_chunked_encoding() {
744 let test_fut = async {
745 let input_str = "Hello world";
746 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
747 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
748
749 let mut output = SegmentedBuf::new();
750 while let Some(buf) = body.data().await {
751 output.push(buf.unwrap());
752 }
753
754 let mut actual_output = String::new();
755 output
756 .reader()
757 .read_to_string(&mut actual_output)
758 .expect("Doesn't cause IO errors");
759
760 let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
761
762 assert_eq!(expected_output, actual_output);
763 assert!(
764 body.trailers()
765 .await
766 .expect("no errors occurred during trailer polling")
767 .is_none(),
768 "aws-chunked encoded bodies don't have normal HTTP trailers"
769 );
770
771 };
773
774 let timeout_duration = Duration::from_secs(3);
775 if tokio::time::timeout(timeout_duration, test_fut)
776 .await
777 .is_err()
778 {
779 panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
780 }
781 }
782
783 #[tokio::test]
784 async fn test_aws_chunked_encoding_sputtering_body() {
785 let test_fut = async {
786 let input = SputteringBody {
787 parts: vec![
788 Some(Bytes::from_static(b"chunk 1, ")),
789 None,
790 Some(Bytes::from_static(b"chunk 2, ")),
791 Some(Bytes::from_static(b"chunk 3, ")),
792 None,
793 None,
794 Some(Bytes::from_static(b"chunk 4, ")),
795 Some(Bytes::from_static(b"chunk 5, ")),
796 Some(Bytes::from_static(b"chunk 6")),
797 ],
798 cursor: 0,
799 delay_in_millis: 500,
800 };
801 let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
802 let mut body = AwsChunkedBody::new(input, opts);
803
804 let mut output = SegmentedBuf::new();
805 while let Some(buf) = body.data().await {
806 output.push(buf.unwrap());
807 }
808
809 let mut actual_output = String::new();
810 output
811 .reader()
812 .read_to_string(&mut actual_output)
813 .expect("Doesn't cause IO errors");
814
815 let expected_output =
816 "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
817
818 assert_eq!(expected_output, actual_output);
819 assert!(
820 body.trailers()
821 .await
822 .expect("no errors occurred during trailer polling")
823 .is_none(),
824 "aws-chunked encoded bodies don't have normal HTTP trailers"
825 );
826 };
827
828 let timeout_duration = Duration::from_secs(3);
829 if tokio::time::timeout(timeout_duration, test_fut)
830 .await
831 .is_err()
832 {
833 panic!(
834 "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
835 );
836 }
837 }
838
839 #[tokio::test]
840 #[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 44, expected: 0 }"]
841 async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
842 let input_str = "Hello world";
843 let wrong_trailer_len = 42;
847 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
848 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
849
850 while let Some(buf) = body.data().await {
852 drop(buf.unwrap());
853 }
854
855 assert!(
856 body.trailers()
857 .await
858 .expect("no errors occurred during trailer polling")
859 .is_none(),
860 "aws-chunked encoded bodies don't have normal HTTP trailers"
861 );
862 }
863
864 #[tokio::test]
865 async fn test_aws_chunked_encoding_empty_body() {
866 let input_str = "";
867 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
868 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
869
870 let mut output = SegmentedBuf::new();
871 while let Some(buf) = body.data().await {
872 output.push(buf.unwrap());
873 }
874
875 let mut actual_output = String::new();
876 output
877 .reader()
878 .read_to_string(&mut actual_output)
879 .expect("Doesn't cause IO errors");
880
881 let expected_output = [CHUNK_TERMINATOR, CRLF].concat();
882
883 assert_eq!(expected_output, actual_output);
884 assert!(
885 body.trailers()
886 .await
887 .expect("no errors occurred during trailer polling")
888 .is_none(),
889 "aws-chunked encoded bodies don't have normal HTTP trailers"
890 );
891 }
892
893 #[tokio::test]
894 async fn test_total_rendered_length_of_trailers() {
895 let mut trailers = HeaderMap::new();
896
897 trailers.insert("empty_value", HeaderValue::from_static(""));
898
899 trailers.insert("single_value", HeaderValue::from_static("value 1"));
900
901 trailers.insert("two_values", HeaderValue::from_static("value 1"));
902 trailers.append("two_values", HeaderValue::from_static("value 2"));
903
904 trailers.insert("three_values", HeaderValue::from_static("value 1"));
905 trailers.append("three_values", HeaderValue::from_static("value 2"));
906 trailers.append("three_values", HeaderValue::from_static("value 3"));
907
908 let trailers = Some(trailers);
909 let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
910 let expected_length =
911 (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
912
913 assert_eq!(expected_length, actual_length);
914 }
915
916 #[tokio::test]
917 async fn test_total_rendered_length_of_empty_trailers() {
918 let trailers = Some(HeaderMap::new());
919 let actual_length = total_rendered_length_of_trailers(trailers.as_ref());
920 let expected_length =
921 (trailers_as_aws_chunked_bytes(trailers, actual_length).len()) as u64;
922
923 assert_eq!(expected_length, actual_length);
924 }
925 }
926
927 #[cfg(test)]
928 mod http_1x_tests {
929 use super::super::{
930 http_1x_utils::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes},
931 AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
932 };
933
934 use aws_smithy_types::body::SdkBody;
935 use bytes::{Buf, Bytes, BytesMut};
936 use bytes_utils::SegmentedBuf;
937 use http_1x::{HeaderMap, HeaderValue};
938 use http_body_1x::{Body, Frame, SizeHint};
939 use http_body_util::BodyExt;
940 use pin_project_lite::pin_project;
941
942 use std::io::Read;
943 use std::pin::Pin;
944 use std::task::{Context, Poll};
945 use std::time::Duration;
946
947 pin_project! {
948 struct SputteringBody {
949 parts: Vec<Option<Bytes>>,
950 cursor: usize,
951 delay_in_millis: u64,
952 }
953 }
954
955 impl SputteringBody {
956 fn len(&self) -> usize {
957 self.parts.iter().flatten().map(|b| b.len()).sum()
958 }
959 }
960
961 impl Body for SputteringBody {
962 type Data = Bytes;
963 type Error = aws_smithy_types::body::Error;
964
965 fn poll_frame(
966 self: Pin<&mut Self>,
967 cx: &mut Context<'_>,
968 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
969 if self.cursor == self.parts.len() {
970 return Poll::Ready(None);
971 }
972
973 let this = self.project();
974 let delay_in_millis = *this.delay_in_millis;
975 let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
976
977 match next_part {
978 None => {
979 *this.cursor += 1;
980 let waker = cx.waker().clone();
981 tokio::spawn(async move {
982 tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
983 waker.wake();
984 });
985 Poll::Pending
986 }
987 Some(data) => {
988 *this.cursor += 1;
989 let frame = Frame::data(data);
990 Poll::Ready(Some(Ok(frame)))
991 }
992 }
993 }
994
995 fn is_end_stream(&self) -> bool {
996 false
997 }
998
999 fn size_hint(&self) -> SizeHint {
1000 SizeHint::new()
1001 }
1002 }
1003
1004 #[tokio::test]
1005 async fn test_aws_chunked_encoding() {
1006 let test_fut = async {
1007 let input_str = "Hello world";
1008 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1009 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1010
1011 let mut output = SegmentedBuf::new();
1012 while let Some(Ok(buf)) = body.frame().await {
1013 output.push(buf.into_data().unwrap());
1014 }
1015
1016 let mut actual_output = String::new();
1017 output
1018 .reader()
1019 .read_to_string(&mut actual_output)
1020 .expect("Doesn't cause IO errors");
1021
1022 let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
1023
1024 assert_eq!(expected_output, actual_output);
1025
1026 };
1028
1029 let timeout_duration = Duration::from_secs(3);
1030 if tokio::time::timeout(timeout_duration, test_fut)
1031 .await
1032 .is_err()
1033 {
1034 panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
1035 }
1036 }
1037
1038 #[tokio::test]
1039 async fn test_aws_chunked_encoding_sputtering_body() {
1040 let test_fut = async {
1041 let input = SputteringBody {
1042 parts: vec![
1043 Some(Bytes::from_static(b"chunk 1, ")),
1044 None,
1045 Some(Bytes::from_static(b"chunk 2, ")),
1046 Some(Bytes::from_static(b"chunk 3, ")),
1047 None,
1048 None,
1049 Some(Bytes::from_static(b"chunk 4, ")),
1050 Some(Bytes::from_static(b"chunk 5, ")),
1051 Some(Bytes::from_static(b"chunk 6")),
1052 ],
1053 cursor: 0,
1054 delay_in_millis: 500,
1055 };
1056 let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
1057 let mut body = AwsChunkedBody::new(input, opts);
1058
1059 let mut output = SegmentedBuf::new();
1060 while let Some(Ok(buf)) = body.frame().await {
1061 output.push(buf.into_data().unwrap());
1062 }
1063
1064 let mut actual_output = String::new();
1065 output
1066 .reader()
1067 .read_to_string(&mut actual_output)
1068 .expect("Doesn't cause IO errors");
1069
1070 let expected_output =
1071 "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
1072
1073 assert_eq!(expected_output, actual_output);
1074 };
1075
1076 let timeout_duration = Duration::from_secs(3);
1077 if tokio::time::timeout(timeout_duration, test_fut)
1078 .await
1079 .is_err()
1080 {
1081 panic!(
1082 "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
1083 );
1084 }
1085 }
1086
1087 #[tokio::test]
1088 async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
1089 let input_str = "Hello world";
1090 let wrong_trailer_len = 42;
1094 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
1095 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1096
1097 while let Some(Ok(frame)) = body.frame().await {
1099 assert!(!frame.is_trailers());
1100 }
1101 }
1102
1103 #[tokio::test]
1104 async fn test_aws_chunked_encoding_empty_body() {
1105 let input_str = "";
1106 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
1107 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
1108
1109 let mut output = SegmentedBuf::new();
1110 while let Some(Ok(frame)) = body.frame().await {
1111 output.push(frame.into_data().unwrap());
1112 }
1113
1114 let mut actual_output = String::new();
1115 output
1116 .reader()
1117 .read_to_string(&mut actual_output)
1118 .expect("Doesn't cause IO errors");
1119
1120 let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
1121 let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
1122 let expected_output = std::str::from_utf8(&expected_output).unwrap();
1123
1124 assert_eq!(expected_output, actual_output);
1125 }
1126
1127 #[tokio::test]
1128 async fn test_total_rendered_length_of_trailers() {
1129 let mut trailers = HeaderMap::new();
1130
1131 trailers.insert("empty_value", HeaderValue::from_static(""));
1132
1133 trailers.insert("single_value", HeaderValue::from_static("value 1"));
1134
1135 trailers.insert("two_values", HeaderValue::from_static("value 1"));
1136 trailers.append("two_values", HeaderValue::from_static("value 2"));
1137
1138 trailers.insert("three_values", HeaderValue::from_static("value 1"));
1139 trailers.append("three_values", HeaderValue::from_static("value 2"));
1140 trailers.append("three_values", HeaderValue::from_static("value 3"));
1141
1142 let trailers = Some(&trailers);
1143 let actual_length = total_rendered_length_of_trailers(trailers);
1144 let buf = BytesMut::with_capacity(actual_length as usize);
1145 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1146
1147 assert_eq!(expected_length, actual_length);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_total_rendered_length_of_empty_trailers() {
1152 let header_map = HeaderMap::new();
1153 let trailers = Some(&header_map);
1154 let actual_length = total_rendered_length_of_trailers(trailers);
1155 let buf = BytesMut::with_capacity(actual_length as usize);
1156 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
1157
1158 assert_eq!(expected_length, actual_length);
1159 }
1160 }
1161}