1use bytes::{Buf, Bytes, BytesMut};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::content_encoding::body::{AwsChunkedBody, AwsChunkedBodyError, AwsChunkedBodyState};
11use crate::content_encoding::{
12 header, SignChunk, CHUNK_SIGNATURE_BEGIN_RAW, CHUNK_TERMINATOR_RAW, CRLF_RAW, TRAILER_SEPARATOR,
13};
14use aws_sigv4::http_request::SigningError;
15use aws_smithy_runtime_api::http::Headers;
16
17macro_rules! signer_mut {
18 ($this:expr) => {
19 $this
20 .signer
21 .as_mut()
22 .get_mut()
23 .as_mut()
24 .expect("signer must be set")
25 .0
26 .as_mut()
27 };
28}
29
30impl<Inner> http_body_1x::Body for AwsChunkedBody<Inner>
31where
32 Inner: http_body_1x::Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
33{
34 type Data = Bytes;
35 type Error = aws_smithy_types::body::Error;
36
37 fn is_end_stream(&self) -> bool {
38 self.state == AwsChunkedBodyState::Closed
39 }
40
41 fn size_hint(&self) -> http_body_1x::SizeHint {
42 http_body_1x::SizeHint::with_exact(self.options.encoded_length())
43 }
44
45 fn poll_frame(
46 self: Pin<&mut Self>,
47 cx: &mut Context<'_>,
48 ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
49 tracing::trace!(state = ?self.state, "polling AwsChunkedBody");
50 let mut this = self.project();
51 let chunk_size = this.options.chunk_size();
52
53 use AwsChunkedBodyState::*;
54 match *this.state {
55 WritingChunk => {
56 while !this.chunk_buffer.is_eos() {
57 if this.chunk_buffer.remaining() >= chunk_size {
58 let buf = this.chunk_buffer.buffered();
59 let chunk_bytes = buf.copy_to_bytes(chunk_size);
60 let chunk = if this.options.is_signed {
61 let signer = signer_mut!(this);
62 signed_encoded_chunk(signer, chunk_bytes).map_err(|e| {
63 Box::new(AwsChunkedBodyError::FailedToSign { source: e })
64 })?
65 } else {
66 unsigned_encoded_chunk(chunk_bytes)
67 };
68 *this.inner_body_bytes_read_so_far += chunk_size;
69 tracing::trace!("writing chunk data: {:#?}", chunk);
70 return Poll::Ready(Some(Ok(http_body_1x::Frame::data(chunk))));
71 }
72
73 match Self::buffer_next_chunk(
74 this.inner.as_mut(),
75 this.chunk_buffer.as_mut(),
76 this.buffered_trailing_headers.as_mut(),
77 cx,
78 ) {
79 Poll::Ready(Ok(true)) => continue,
80 Poll::Ready(Ok(false)) => break,
81 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
82 Poll::Pending => return Poll::Pending,
83 }
84 }
85
86 if this.chunk_buffer.remaining() > 0 {
87 let bytes_len_to_read =
88 std::cmp::min(this.chunk_buffer.remaining(), chunk_size);
89 let buf = this.chunk_buffer.buffered();
90 let chunk_bytes = buf.copy_to_bytes(bytes_len_to_read);
91 let chunk = if this.options.is_signed {
92 let signer = signer_mut!(this);
93 signed_encoded_chunk(signer, chunk_bytes).map_err(|e| {
94 Box::new(AwsChunkedBodyError::FailedToSign { source: e })
95 })?
96 } else {
97 unsigned_encoded_chunk(chunk_bytes)
98 };
99 *this.inner_body_bytes_read_so_far += bytes_len_to_read;
100 tracing::trace!("remaining chunk data: {:#?}", chunk);
101 return Poll::Ready(Some(Ok(http_body_1x::Frame::data(chunk))));
102 }
103
104 debug_assert!(this.chunk_buffer.remaining() == 0);
105
106 if let Err(poll_stream_len_err) = check_for_stream_length_mismatch(
108 *this.inner_body_bytes_read_so_far as u64,
109 this.options.stream_length,
110 ) {
111 return poll_stream_len_err;
112 }
113
114 if this.options.is_signed {
115 *this.state = WritingZeroSizedSignedChunk;
116 } else {
117 *this.state = PollingTrailers;
118 }
119 cx.waker().wake_by_ref();
122 Poll::Pending
123 }
124 WritingZeroSizedSignedChunk => {
125 let signer = signer_mut!(this);
126 let zero_sized_chunk = signed_encoded_chunk(signer, Bytes::new())
127 .map_err(|e| Box::new(AwsChunkedBodyError::FailedToSign { source: e }))?;
128 if this.buffered_trailing_headers.is_some() {
129 *this.state = PollingTrailers;
130 let mut zero_sized_chunk = BytesMut::from(&zero_sized_chunk[..]);
131 debug_assert!(zero_sized_chunk.ends_with(b"\r\n\r\n"));
132 zero_sized_chunk.truncate(zero_sized_chunk.len() - 2);
134 let zero_sized_chunk = zero_sized_chunk.freeze();
135 tracing::trace!("writing zero sized signed chunk: {:#?}", zero_sized_chunk);
136 Poll::Ready(Some(Ok(http_body_1x::Frame::data(zero_sized_chunk))))
137 } else {
138 *this.state = Closed;
139 tracing::trace!(
140 "writing zero sized signed chunk without trailer: {:#?}",
141 zero_sized_chunk
142 );
143 Poll::Ready(Some(Ok(http_body_1x::Frame::data(zero_sized_chunk))))
144 }
145 }
146 PollingTrailers => match this.inner.as_mut().poll_frame(cx) {
147 Poll::Ready(Some(Ok(frame))) => {
148 let trailers = frame.into_trailers().ok();
149 if let Some(trailers) = trailers {
150 match this.buffered_trailing_headers.as_mut().get_mut() {
151 Some(existing) => existing.extend(trailers),
152 None => {
153 *this.buffered_trailing_headers.as_mut().get_mut() = Some(trailers)
154 }
155 }
156 }
157 cx.waker().wake_by_ref();
160 Poll::Pending
161 }
162 Poll::Ready(Some(Err(err))) => {
163 tracing::error!(error = ?err, "error polling inner");
164 Poll::Ready(Some(Err(err)))
165 }
166 Poll::Ready(None) => {
167 *this.state = WritingTrailers;
168 cx.waker().wake_by_ref();
171 Poll::Pending
172 }
173 Poll::Pending => Poll::Pending,
174 },
175 WritingTrailers => {
176 let mut final_chunk = if this.options.is_signed {
177 BytesMut::new()
178 } else {
179 BytesMut::from(CHUNK_TERMINATOR_RAW)
180 };
181
182 let trailer_bytes = if let Some(mut trailer) = this.buffered_trailing_headers.take()
183 {
184 let mut trailer_bytes = BytesMut::new();
185 let trailer = if this.options.is_signed && !trailer.is_empty() {
186 let signer = signer_mut!(this);
187 let signature = signer
188 .trailer_signature(&Headers::try_from(trailer.clone())?)
189 .map_err(|e| {
190 Box::new(AwsChunkedBodyError::FailedToSign { source: e })
191 })?;
192 trailer.insert(
193 http_1x::header::HeaderName::from_static(
194 header::X_AMZ_TRAILER_SIGNATURE,
195 ),
196 http_1x::header::HeaderValue::from_str(&signature).unwrap(),
197 );
198 trailer
199 } else {
200 trailer
201 };
202
203 let actual_length: u64 = total_rendered_length_of_trailers(Some(&trailer));
204 let expected_length = this.options.total_trailer_length();
205 if expected_length != actual_length {
206 let err = AwsChunkedBodyError::ReportedTrailerLengthMismatch {
207 actual: actual_length,
208 expected: expected_length,
209 };
210 return Poll::Ready(Some(Err(err.into())));
211 }
212
213 trailer_bytes = trailers_as_aws_chunked_bytes(Some(&trailer), trailer_bytes);
214 trailer_bytes.freeze()
215 } else {
216 Bytes::new()
217 };
218
219 *this.state = Closed;
220
221 if final_chunk.is_empty() && trailer_bytes.is_empty() {
222 return Poll::Ready(None);
224 }
225
226 final_chunk.extend_from_slice(&trailer_bytes);
227 final_chunk.extend_from_slice(CRLF_RAW);
228
229 tracing::trace!("final chunk: {:#?}", final_chunk);
230 Poll::Ready(Some(Ok(http_body_1x::Frame::data(final_chunk.freeze()))))
231 }
232 Closed => Poll::Ready(None),
233 #[allow(unreachable_patterns)]
234 ref otherwise => {
236 unreachable!(
237 "invalid state {otherwise:?} for `poll_frame` in http-1x; this is a bug"
238 )
239 }
240 }
241 }
242}
243
244fn signed_encoded_chunk(
245 signer: &mut (dyn SignChunk + Send + Sync),
246 chunk_bytes: Bytes,
247) -> Result<Bytes, SigningError> {
248 let chunk_size = format!("{:X}", chunk_bytes.len());
249 let mut chunk = BytesMut::new();
250 chunk.extend_from_slice(chunk_size.as_bytes());
251 chunk.extend_from_slice(CHUNK_SIGNATURE_BEGIN_RAW);
252 chunk.extend_from_slice(signer.chunk_signature(&chunk_bytes)?.as_bytes());
253 chunk.extend_from_slice(CRLF_RAW);
254 chunk.extend_from_slice(&chunk_bytes);
255 chunk.extend_from_slice(CRLF_RAW);
256 Ok(chunk.freeze())
257}
258
259fn unsigned_encoded_chunk(chunk_bytes: Bytes) -> Bytes {
260 let chunk_size = format!("{:X}", chunk_bytes.len());
261 let mut chunk = BytesMut::new();
262 chunk.extend_from_slice(chunk_size.as_bytes());
263 chunk.extend_from_slice(CRLF_RAW);
264 chunk.extend_from_slice(&chunk_bytes);
265 chunk.extend_from_slice(CRLF_RAW);
266 chunk.freeze()
267}
268
269fn trailers_as_aws_chunked_bytes(
275 trailer_map: Option<&http_1x::HeaderMap>,
276 mut buffer: BytesMut,
277) -> BytesMut {
278 if let Some(trailer_map) = trailer_map {
279 let mut current_header_name: Option<http_1x::header::HeaderName> = None;
280
281 for (header_name, header_value) in trailer_map.clone().into_iter() {
282 current_header_name = header_name.or(current_header_name);
286
287 if let Some(header_name) = current_header_name.as_ref() {
289 buffer.extend_from_slice(header_name.as_ref());
290 buffer.extend_from_slice(TRAILER_SEPARATOR);
291 buffer.extend_from_slice(header_value.as_bytes());
292 buffer.extend_from_slice(CRLF_RAW);
293 }
294 }
295
296 buffer
297 } else {
298 buffer
299 }
300}
301
302fn total_rendered_length_of_trailers(trailer_map: Option<&http_1x::HeaderMap>) -> u64 {
309 match trailer_map {
310 Some(trailer_map) => trailer_map
311 .iter()
312 .map(|(trailer_name, trailer_value)| {
313 trailer_name.as_str().len()
314 + TRAILER_SEPARATOR.len()
315 + trailer_value.len()
316 + CRLF_RAW.len()
317 })
318 .sum::<usize>() as u64,
319 None => 0,
320 }
321}
322
323#[allow(clippy::type_complexity)]
326fn check_for_stream_length_mismatch(
327 actual_stream_length: u64,
328 expected_stream_length: u64,
329) -> Result<(), Poll<Option<Result<http_body_1x::Frame<Bytes>, aws_smithy_types::body::Error>>>> {
330 if actual_stream_length != expected_stream_length {
331 let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
332 actual: actual_stream_length,
333 expected: expected_stream_length,
334 });
335 return Err(Poll::Ready(Some(Err(err))));
336 };
337
338 Ok(())
339}
340
341#[cfg(test)]
342mod tests {
343 use super::{total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes};
344 use crate::content_encoding::{
345 AwsChunkedBody, AwsChunkedBodyOptions, CHUNK_TERMINATOR_RAW, CRLF_RAW,
346 DEFAULT_CHUNK_SIZE_BYTE,
347 };
348
349 use aws_smithy_types::body::SdkBody;
350 use bytes::{Buf, Bytes, BytesMut};
351 use bytes_utils::SegmentedBuf;
352 use http_1x::{HeaderMap, HeaderValue};
353 use http_body_1x::{Body, Frame, SizeHint};
354 use http_body_util::BodyExt;
355 use pin_project_lite::pin_project;
356
357 use std::io::Read;
358 use std::pin::Pin;
359 use std::task::{Context, Poll};
360 use std::time::Duration;
361
362 pin_project! {
363 struct SputteringBody {
364 parts: Vec<Option<Bytes>>,
365 cursor: usize,
366 delay_in_millis: u64,
367 }
368 }
369
370 impl SputteringBody {
371 fn len(&self) -> usize {
372 self.parts.iter().flatten().map(|b| b.len()).sum()
373 }
374 }
375
376 impl Body for SputteringBody {
377 type Data = Bytes;
378 type Error = aws_smithy_types::body::Error;
379
380 fn poll_frame(
381 self: Pin<&mut Self>,
382 cx: &mut Context<'_>,
383 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
384 if self.cursor == self.parts.len() {
385 return Poll::Ready(None);
386 }
387
388 let this = self.project();
389 let delay_in_millis = *this.delay_in_millis;
390 let next_part = this.parts.get_mut(*this.cursor).unwrap().take();
391
392 match next_part {
393 None => {
394 *this.cursor += 1;
395 let waker = cx.waker().clone();
396 tokio::spawn(async move {
397 tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
398 waker.wake();
399 });
400 Poll::Pending
401 }
402 Some(data) => {
403 *this.cursor += 1;
404 let frame = Frame::data(data);
405 Poll::Ready(Some(Ok(frame)))
406 }
407 }
408 }
409
410 fn is_end_stream(&self) -> bool {
411 false
412 }
413
414 fn size_hint(&self) -> SizeHint {
415 SizeHint::new()
416 }
417 }
418
419 pin_project! {
421 struct TestBodyWithTrailers {
422 data: Option<Bytes>,
423 trailers: Option<HeaderMap>,
424 }
425 }
426
427 impl Body for TestBodyWithTrailers {
428 type Data = Bytes;
429 type Error = aws_smithy_types::body::Error;
430
431 fn poll_frame(
432 self: Pin<&mut Self>,
433 _cx: &mut Context<'_>,
434 ) -> Poll<Option<Result<http_body_1x::Frame<Self::Data>, Self::Error>>> {
435 let this = self.project();
436
437 if let Some(data) = this.data.take() {
438 return Poll::Ready(Some(Ok(http_body_1x::Frame::data(data))));
439 }
440
441 if let Some(trailers) = this.trailers.take() {
442 return Poll::Ready(Some(Ok(http_body_1x::Frame::trailers(trailers))));
443 }
444
445 Poll::Ready(None)
446 }
447 }
448
449 #[tokio::test]
450 async fn test_aws_chunked_encoding() {
451 let test_fut = async {
452 let input_str = "Hello world";
453 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
454 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
455
456 let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
457 while let Some(Ok(buf)) = body.frame().await {
458 output.push(buf.into_data().unwrap());
459 }
460
461 let mut actual_output = String::new();
462 output
463 .reader()
464 .read_to_string(&mut actual_output)
465 .expect("Doesn't cause IO errors");
466
467 let expected_output = "B\r\nHello world\r\n0\r\n\r\n";
468
469 assert_eq!(expected_output, actual_output);
470
471 };
473
474 let timeout_duration = Duration::from_secs(3);
475 if tokio::time::timeout(timeout_duration, test_fut)
476 .await
477 .is_err()
478 {
479 panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
480 }
481 }
482
483 #[tokio::test]
484 async fn test_aws_chunked_encoding_sputtering_body() {
485 let test_fut = async {
486 let input = SputteringBody {
487 parts: vec![
488 Some(Bytes::from_static(b"chunk 1, ")),
489 None,
490 Some(Bytes::from_static(b"chunk 2, ")),
491 Some(Bytes::from_static(b"chunk 3, ")),
492 None,
493 None,
494 Some(Bytes::from_static(b"chunk 4, ")),
495 Some(Bytes::from_static(b"chunk 5, ")),
496 Some(Bytes::from_static(b"chunk 6")),
497 ],
498 cursor: 0,
499 delay_in_millis: 500,
500 };
501 let opts = AwsChunkedBodyOptions::new(input.len() as u64, vec![]);
502 let mut body = AwsChunkedBody::new(input, opts);
503
504 let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
505 while let Some(Ok(buf)) = body.frame().await {
506 output.push(buf.into_data().unwrap());
507 }
508
509 let mut actual_output = String::new();
510 output
511 .reader()
512 .read_to_string(&mut actual_output)
513 .expect("Doesn't cause IO errors");
514
515 let expected_output =
516 "34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";
517
518 assert_eq!(expected_output, actual_output);
519 };
520
521 let timeout_duration = Duration::from_secs(3);
522 if tokio::time::timeout(timeout_duration, test_fut)
523 .await
524 .is_err()
525 {
526 panic!(
527 "test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
528 );
529 }
530 }
531
532 #[tokio::test]
533 async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
534 let input_str = "Hello world";
535 let wrong_trailer_len = 42;
539 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
540 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
541
542 while let Some(Ok(frame)) = body.frame().await {
544 assert!(!frame.is_trailers());
545 }
546 }
547
548 #[tokio::test]
549 async fn test_aws_chunked_encoding_empty_body() {
550 let input_str = "";
551 let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![]);
552 let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);
553
554 let mut output: SegmentedBuf<Bytes> = SegmentedBuf::new();
555 while let Some(Ok(frame)) = body.frame().await {
556 output.push(frame.into_data().unwrap());
557 }
558
559 let mut actual_output = String::new();
560 output
561 .reader()
562 .read_to_string(&mut actual_output)
563 .expect("Doesn't cause IO errors");
564
565 let actual_output = std::str::from_utf8(actual_output.as_bytes()).unwrap();
566 let expected_output = [CHUNK_TERMINATOR_RAW, CRLF_RAW].concat();
567 let expected_output = std::str::from_utf8(&expected_output).unwrap();
568
569 assert_eq!(expected_output, actual_output);
570 }
571
572 #[tokio::test]
573 async fn test_total_rendered_length_of_trailers() {
574 let mut trailers = HeaderMap::new();
575
576 trailers.insert("empty_value", HeaderValue::from_static(""));
577
578 trailers.insert("single_value", HeaderValue::from_static("value 1"));
579
580 trailers.insert("two_values", HeaderValue::from_static("value 1"));
581 trailers.append("two_values", HeaderValue::from_static("value 2"));
582
583 trailers.insert("three_values", HeaderValue::from_static("value 1"));
584 trailers.append("three_values", HeaderValue::from_static("value 2"));
585 trailers.append("three_values", HeaderValue::from_static("value 3"));
586
587 let trailers = Some(&trailers);
588 let actual_length = total_rendered_length_of_trailers(trailers);
589 let buf = BytesMut::with_capacity(actual_length as usize);
590 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
591
592 assert_eq!(expected_length, actual_length);
593 }
594
595 #[tokio::test]
596 async fn test_total_rendered_length_of_empty_trailers() {
597 let header_map = HeaderMap::new();
598 let trailers = Some(&header_map);
599 let actual_length = total_rendered_length_of_trailers(trailers);
600 let buf = BytesMut::with_capacity(actual_length as usize);
601 let expected_length = (trailers_as_aws_chunked_bytes(trailers, buf).len()) as u64;
602
603 assert_eq!(expected_length, actual_length);
604 }
605
606 #[tokio::test]
607 async fn test_poll_frame_with_default_chunk_size() {
608 let test_data = Bytes::from("1234567890123456789012345");
609 let body = SdkBody::from(test_data.clone());
610 let options = AwsChunkedBodyOptions::new(test_data.len() as u64, vec![]);
611 let mut chunked_body = AwsChunkedBody::new(body, options);
612
613 let mut data_frames: Vec<Bytes> = Vec::new();
614 while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
615 if let Ok(data) = frame.into_data() {
616 data_frames.push(data);
617 }
618 }
619
620 assert_eq!(data_frames.len(), 2); assert_eq!(
622 Bytes::from_static(b"19\r\n1234567890123456789012345\r\n"),
623 data_frames[0]
624 );
625 assert_eq!(Bytes::from_static(b"0\r\n\r\n"), data_frames[1]);
626 }
627
628 #[tokio::test]
629 async fn test_poll_frame_with_custom_chunk_size() {
630 let test_data = Bytes::from("1234567890123456789012345");
631 let body = SdkBody::from(test_data.clone());
632 let options =
633 AwsChunkedBodyOptions::new(test_data.len() as u64, vec![]).with_chunk_size(10);
634 let mut chunked_body = AwsChunkedBody::new(body, options);
635
636 let mut data_frames: Vec<Bytes> = Vec::new();
637 while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
638 if let Ok(data) = frame.into_data() {
639 data_frames.push(data);
640 }
641 }
642
643 assert_eq!(4, data_frames.len()); assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[0]);
645 assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[1]);
646 assert_eq!(Bytes::from_static(b"5\r\n12345\r\n"), data_frames[2]);
647 assert_eq!(Bytes::from_static(b"0\r\n\r\n"), data_frames[3]);
648 }
649
650 #[tokio::test]
651 async fn test_poll_frame_with_trailers() {
652 let data = Bytes::from("1234567890123456789012345");
653 let stream_len = data.len() as u64;
654 let mut trailers = HeaderMap::new();
655 trailers.insert("x-amz-checksum-crc32", HeaderValue::from_static("78DeVw=="));
656 let body = TestBodyWithTrailers {
657 data: Some(data),
658 trailers: Some(trailers),
659 };
660 let options = AwsChunkedBodyOptions::new(stream_len, vec![29]).with_chunk_size(10);
661 let mut chunked_body = AwsChunkedBody::new(body, options);
662
663 let mut data_frames: Vec<Bytes> = Vec::new();
664 while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
665 if let Ok(data) = frame.into_data() {
666 data_frames.push(data);
667 }
668 }
669
670 assert_eq!(4, data_frames.len()); assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[0]);
672 assert_eq!(Bytes::from_static(b"A\r\n1234567890\r\n"), data_frames[1]);
673 assert_eq!(Bytes::from_static(b"5\r\n12345\r\n"), data_frames[2]);
674 assert_eq!(
675 Bytes::from_static(b"0\r\nx-amz-checksum-crc32:78DeVw==\r\n\r\n"),
676 data_frames[3]
677 );
678 }
679
680 #[tokio::test]
682 async fn test_aws_chunked_body_poll_frame_with_signer() {
683 use crate::auth::sigv4::SigV4MessageSigner;
684 use aws_credential_types::Credentials;
685 use aws_sigv4::http_request::SigningSettings;
686 use aws_smithy_async::time::{SharedTimeSource, StaticTimeSource};
687 use aws_types::region::SigningRegion;
688 use aws_types::SigningName;
689 use std::time::{Duration, UNIX_EPOCH};
690
691 let data = "a".repeat(65 * 1024);
693 let stream_len = data.len() as u64;
694 let inner_body = SdkBody::from(data);
695
696 let time = StaticTimeSource::new(UNIX_EPOCH + Duration::from_secs(1369353600));
698 let shared_time = SharedTimeSource::from(time);
699
700 let credentials = Credentials::new(
701 "AKIAIOSFODNN7EXAMPLE",
702 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
703 None,
704 None,
705 "test",
706 );
707
708 let seed_signature =
709 "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9".to_owned();
710 let signer = SigV4MessageSigner::new(
711 seed_signature,
712 credentials.into(),
713 SigningRegion::from_static("us-east-1"),
714 SigningName::from_static("s3"),
715 shared_time,
716 SigningSettings::default(),
717 );
718
719 let opt = AwsChunkedBodyOptions::new(stream_len, vec![]).signed_chunked_encoding(true);
720 let mut chunked_body = AwsChunkedBody::new(inner_body, opt).with_signer(signer);
721
722 let mut data_frames: Vec<Bytes> = Vec::new();
723 while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
724 if let Ok(data) = frame.into_data() {
725 data_frames.push(data);
726 }
727 }
728
729 assert_eq!(3, data_frames.len()); assert!(data_frames[0].starts_with(b"10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n"));
731 assert!(data_frames[1].starts_with(b"400;chunk-signature=0055627c9e194cb4542bae2aa5492e3c1575bbb81b612b7d234b86a503ef5497\r\n"));
732 assert_eq!(data_frames[2], Bytes::from_static(b"0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n"));
733 }
734
735 #[tokio::test]
737 async fn test_aws_chunked_body_poll_frame_with_signer_and_trailers() {
738 use crate::auth::sigv4::SigV4MessageSigner;
739 use aws_credential_types::Credentials;
740 use aws_sigv4::http_request::SigningSettings;
741 use aws_smithy_async::time::{SharedTimeSource, StaticTimeSource};
742 use aws_types::region::SigningRegion;
743 use aws_types::SigningName;
744 use std::time::{Duration, UNIX_EPOCH};
745
746 let data = "a".repeat(65 * 1024);
748 let stream_len = data.len() as u64;
749
750 let mut trailers = HeaderMap::new();
752 trailers.insert(
753 "x-amz-checksum-crc32c",
754 HeaderValue::from_static("sOO8/Q=="),
755 );
756
757 let inner_body = TestBodyWithTrailers {
758 data: Some(Bytes::from(data)),
759 trailers: Some(trailers),
760 };
761
762 let time = StaticTimeSource::new(UNIX_EPOCH + Duration::from_secs(1369353600));
764 let shared_time = SharedTimeSource::from(time);
765
766 let credentials = Credentials::new(
767 "AKIAIOSFODNN7EXAMPLE",
768 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
769 None,
770 None,
771 "test",
772 );
773
774 let seed_signature =
775 "106e2a8a18243abcf37539882f36619c00e2dfc72633413f02d3b74544bfeb8e".to_owned();
776 let signer = SigV4MessageSigner::new(
777 seed_signature,
778 credentials.into(),
779 SigningRegion::from_static("us-east-1"),
780 SigningName::from_static("s3"),
781 shared_time,
782 SigningSettings::default(),
783 );
784
785 let opt =
786 AwsChunkedBodyOptions::new(stream_len, vec![30, 88]).signed_chunked_encoding(true);
787 let mut chunked_body = AwsChunkedBody::new(inner_body, opt).with_signer(signer);
788
789 let mut data_frames: Vec<Bytes> = Vec::new();
790 while let Some(frame) = chunked_body.frame().await.transpose().unwrap() {
791 if let Ok(data) = frame.into_data() {
792 data_frames.push(data);
793 }
794 }
795
796 assert_eq!(4, data_frames.len()); assert!(data_frames[0].starts_with(b"10000;chunk-signature=b474d8862b1487a5145d686f57f013e54db672cee1c953b3010fb58501ef5aa2\r\n"));
798 assert!(data_frames[1].starts_with(b"400;chunk-signature=1c1344b170168f8e65b41376b44b20fe354e373826ccbbe2c1d40a8cae51e5c7\r\n"));
799 assert_eq!(data_frames[2], Bytes::from_static(b"0;chunk-signature=2ca2aba2005185cf7159c6277faf83795951dd77a3a99e6e65d5c9f85863f992\r\n"));
800 assert_eq!(data_frames[3], Bytes::from_static(b"x-amz-checksum-crc32c:sOO8/Q==\r\nx-amz-trailer-signature:d81f82fc3505edab99d459891051a732e8730629a2e4a59689829ca17fe2e435\r\n\r\n"));
801 }
802
803 #[test]
804 fn test_unsigned_encoded_length_with_no_trailer() {
805 {
806 let options = AwsChunkedBodyOptions::new(10, vec![]);
807 assert_eq!(options.encoded_length(), 20);
821 }
822 {
823 let options = AwsChunkedBodyOptions::new((DEFAULT_CHUNK_SIZE_BYTE + 10) as u64, vec![]);
824 assert_eq!(options.encoded_length(), 65565);
842 }
843 }
844
845 #[test]
846 fn test_unsigned_encoded_length_with_trailer() {
847 let options = AwsChunkedBodyOptions::new(10, vec![30]);
848 assert_eq!(options.encoded_length(), 52);
864 }
865
866 #[test]
867 fn test_signed_encoded_length_with_no_trailer() {
868 {
869 let options = AwsChunkedBodyOptions::new(10, vec![]).signed_chunked_encoding(true);
870 assert_eq!(options.encoded_length(), 182);
884 }
885 {
886 let options = AwsChunkedBodyOptions::new((DEFAULT_CHUNK_SIZE_BYTE + 10) as u64, vec![])
887 .signed_chunked_encoding(true);
888 assert_eq!(options.encoded_length(), 65808);
906 }
907 }
908
909 #[test]
910 fn test_signed_encoded_length_with_trailer() {
911 let options = AwsChunkedBodyOptions::new(10, vec![30, 88]).signed_chunked_encoding(true);
912 assert_eq!(options.encoded_length(), 304);
930 }
931}