1use base64::Engine;
2use base64::engine::general_purpose;
3use hmac::Mac;
4use quick_xml::se::to_string;
5use std::collections::HashMap;
6#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
7use std::pin::Pin;
8use time::OffsetDateTime;
9use time::format_description::well_known::Rfc2822;
10use url::Url;
11
12use crate::LONG_DATETIME;
13use crate::bucket::Bucket;
14use crate::command::Command;
15use crate::error::S3Error;
16use crate::signing;
17use bytes::Bytes;
18use http::HeaderMap;
19use http::header::{
20 ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, HeaderName, RANGE,
21};
22use std::fmt::Write as _;
23
24#[cfg(feature = "with-async-std")]
25use async_std::stream::Stream;
26
27#[cfg(feature = "with-tokio")]
28use tokio_stream::Stream;
29
30#[derive(Debug)]
31
32pub struct ResponseData {
33 bytes: Bytes,
34 status_code: u16,
35 headers: HashMap<String, String>,
36}
37
38#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
39pub type DataStream = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;
40#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
41pub type StreamItem = Result<Bytes, S3Error>;
42
43#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
44pub struct ResponseDataStream {
45 pub bytes: DataStream,
46 pub status_code: u16,
47}
48
49#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
50impl ResponseDataStream {
51 pub fn bytes(&mut self) -> &mut DataStream {
52 &mut self.bytes
53 }
54}
55
56impl From<ResponseData> for Vec<u8> {
57 fn from(data: ResponseData) -> Vec<u8> {
58 data.to_vec()
59 }
60}
61
62impl ResponseData {
63 pub fn new(bytes: Bytes, status_code: u16, headers: HashMap<String, String>) -> ResponseData {
64 ResponseData {
65 bytes,
66 status_code,
67 headers,
68 }
69 }
70
71 pub fn as_slice(&self) -> &[u8] {
72 &self.bytes
73 }
74
75 pub fn to_vec(self) -> Vec<u8> {
76 self.bytes.to_vec()
77 }
78
79 pub fn bytes(&self) -> &Bytes {
80 &self.bytes
81 }
82
83 pub fn bytes_mut(&mut self) -> &mut Bytes {
84 &mut self.bytes
85 }
86
87 pub fn into_bytes(self) -> Bytes {
88 self.bytes
89 }
90
91 pub fn status_code(&self) -> u16 {
92 self.status_code
93 }
94
95 pub fn as_str(&self) -> Result<&str, std::str::Utf8Error> {
96 std::str::from_utf8(self.as_slice())
97 }
98
99 pub fn to_string(&self) -> Result<String, std::str::Utf8Error> {
100 std::str::from_utf8(self.as_slice()).map(|s| s.to_string())
101 }
102
103 pub fn headers(&self) -> HashMap<String, String> {
104 self.headers.clone()
105 }
106}
107
108use std::fmt;
109
110impl fmt::Display for ResponseData {
111 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
112 write!(
113 f,
114 "Status code: {}\n Data: {}",
115 self.status_code(),
116 self.to_string()
117 .unwrap_or_else(|_| "Data could not be cast to UTF string".to_string())
118 )
119 }
120}
121
122#[cfg(feature = "with-tokio")]
123impl tokio::io::AsyncRead for ResponseDataStream {
124 fn poll_read(
125 mut self: Pin<&mut Self>,
126 cx: &mut std::task::Context<'_>,
127 buf: &mut tokio::io::ReadBuf<'_>,
128 ) -> std::task::Poll<std::io::Result<()>> {
129 match Stream::poll_next(self.bytes.as_mut(), cx) {
131 std::task::Poll::Ready(Some(Ok(chunk))) => {
132 let amt = std::cmp::min(chunk.len(), buf.remaining());
134 buf.put_slice(&chunk[..amt]);
135
136 std::task::Poll::Ready(Ok(()))
141 }
142 std::task::Poll::Ready(Some(Err(error))) => {
143 std::task::Poll::Ready(Err(std::io::Error::other(error)))
145 }
146 std::task::Poll::Ready(None) => {
147 std::task::Poll::Ready(Ok(()))
149 }
150 std::task::Poll::Pending => std::task::Poll::Pending,
151 }
152 }
153}
154
155#[cfg(feature = "with-async-std")]
156impl async_std::io::Read for ResponseDataStream {
157 fn poll_read(
158 mut self: Pin<&mut Self>,
159 cx: &mut std::task::Context<'_>,
160 buf: &mut [u8],
161 ) -> std::task::Poll<std::io::Result<usize>> {
162 match Stream::poll_next(self.bytes.as_mut(), cx) {
164 std::task::Poll::Ready(Some(Ok(chunk))) => {
165 let amt = std::cmp::min(chunk.len(), buf.len());
167 buf[..amt].copy_from_slice(&chunk[..amt]);
168
169 std::task::Poll::Ready(Ok(amt))
174 }
175 std::task::Poll::Ready(Some(Err(error))) => {
176 std::task::Poll::Ready(Err(std::io::Error::other(error)))
178 }
179 std::task::Poll::Ready(None) => {
180 std::task::Poll::Ready(Ok(0))
182 }
183 std::task::Poll::Pending => std::task::Poll::Pending,
184 }
185 }
186}
187
188#[maybe_async::maybe_async]
189pub trait Request {
190 type Response;
191 type HeaderMap;
192
193 async fn response(&self) -> Result<Self::Response, S3Error>;
194 async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error>;
195 #[cfg(feature = "with-tokio")]
196 async fn response_data_to_writer<T: tokio::io::AsyncWrite + Send + Unpin + ?Sized>(
197 &self,
198 writer: &mut T,
199 ) -> Result<u16, S3Error>;
200 #[cfg(feature = "with-async-std")]
201 async fn response_data_to_writer<T: async_std::io::Write + Send + Unpin + ?Sized>(
202 &self,
203 writer: &mut T,
204 ) -> Result<u16, S3Error>;
205 #[cfg(feature = "sync")]
206 fn response_data_to_writer<T: std::io::Write + Send + ?Sized>(
207 &self,
208 writer: &mut T,
209 ) -> Result<u16, S3Error>;
210 #[cfg(any(feature = "with-async-std", feature = "with-tokio"))]
211 async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error>;
212 async fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error>;
213 fn datetime(&self) -> OffsetDateTime;
214 fn bucket(&self) -> Bucket;
215 fn command(&self) -> Command<'_>;
216 fn path(&self) -> String;
217
218 async fn signing_key(&self) -> Result<Vec<u8>, S3Error> {
219 signing::signing_key(
220 &self.datetime(),
221 &self
222 .bucket()
223 .secret_key()
224 .await?
225 .expect("Secret key must be provided to sign headers, found None"),
226 &self.bucket().region(),
227 "s3",
228 )
229 }
230
231 fn request_body(&self) -> Result<Vec<u8>, S3Error> {
232 let result = if let Command::PutObject { content, .. } = self.command() {
233 Vec::from(content)
234 } else if let Command::PutObjectTagging { tags } = self.command() {
235 Vec::from(tags)
236 } else if let Command::UploadPart { content, .. } = self.command() {
237 Vec::from(content)
238 } else if let Command::CompleteMultipartUpload { data, .. } = &self.command() {
239 let body = data.to_string();
240 body.as_bytes().to_vec()
241 } else if let Command::CreateBucket { config } = &self.command() {
242 if let Some(payload) = config.location_constraint_payload() {
243 Vec::from(payload)
244 } else {
245 Vec::new()
246 }
247 } else if let Command::PutBucketLifecycle { configuration, .. } = &self.command() {
248 quick_xml::se::to_string(configuration)?.as_bytes().to_vec()
249 } else if let Command::PutBucketCors { configuration, .. } = &self.command() {
250 let cors = configuration.to_string();
251 cors.as_bytes().to_vec()
252 } else {
253 Vec::new()
254 };
255 Ok(result)
256 }
257
258 fn long_date(&self) -> Result<String, S3Error> {
259 Ok(self.datetime().format(LONG_DATETIME)?)
260 }
261
262 fn string_to_sign(&self, request: &str) -> Result<String, S3Error> {
263 signing::string_to_sign(&self.datetime(), &self.bucket().region(), request)
264 }
265
266 fn host_header(&self) -> String {
267 self.bucket().host()
268 }
269
270 #[maybe_async::async_impl]
271 async fn presigned(&self) -> Result<String, S3Error> {
272 let (expiry, custom_headers, custom_queries) = match self.command() {
273 Command::PresignGet {
274 expiry_secs,
275 custom_queries,
276 } => (expiry_secs, None, custom_queries),
277 Command::PresignPut {
278 expiry_secs,
279 custom_headers,
280 custom_queries,
281 } => (expiry_secs, custom_headers, custom_queries),
282 Command::PresignDelete { expiry_secs } => (expiry_secs, None, None),
283 _ => unreachable!(),
284 };
285
286 let url = self
287 .presigned_url_no_sig(expiry, custom_headers.as_ref(), custom_queries.as_ref())
288 .await?;
289
290 let url_str = if let awsregion::Region::Custom { ref endpoint, .. } = self.bucket().region()
294 {
295 if (endpoint.contains(":80") && url.scheme() == "http" && url.port().is_none())
297 || (endpoint.contains(":443") && url.scheme() == "https" && url.port().is_none())
298 {
299 let host = self.bucket().host();
301 format!(
302 "{}://{}{}{}",
303 url.scheme(),
304 host,
305 url.path(),
306 url.query().map(|q| format!("?{}", q)).unwrap_or_default()
307 )
308 } else {
309 url.to_string()
310 }
311 } else {
312 url.to_string()
313 };
314
315 Ok(format!(
316 "{}&X-Amz-Signature={}",
317 url_str,
318 self.presigned_authorization(custom_headers.as_ref())
319 .await?
320 ))
321 }
322
323 #[maybe_async::sync_impl]
324 async fn presigned(&self) -> Result<String, S3Error> {
325 let (expiry, custom_headers, custom_queries) = match self.command() {
326 Command::PresignGet {
327 expiry_secs,
328 custom_queries,
329 } => (expiry_secs, None, custom_queries),
330 Command::PresignPut {
331 expiry_secs,
332 custom_headers,
333 ..
334 } => (expiry_secs, custom_headers, None),
335 Command::PresignDelete { expiry_secs } => (expiry_secs, None, None),
336 _ => unreachable!(),
337 };
338
339 let url =
340 self.presigned_url_no_sig(expiry, custom_headers.as_ref(), custom_queries.as_ref())?;
341
342 let url_str = if let awsregion::Region::Custom { ref endpoint, .. } = self.bucket().region()
346 {
347 if (endpoint.contains(":80") && url.scheme() == "http" && url.port().is_none())
349 || (endpoint.contains(":443") && url.scheme() == "https" && url.port().is_none())
350 {
351 let host = self.bucket().host();
353 format!(
354 "{}://{}{}{}",
355 url.scheme(),
356 host,
357 url.path(),
358 url.query().map(|q| format!("?{}", q)).unwrap_or_default()
359 )
360 } else {
361 url.to_string()
362 }
363 } else {
364 url.to_string()
365 };
366
367 Ok(format!(
368 "{}&X-Amz-Signature={}",
369 url_str,
370 self.presigned_authorization(custom_headers.as_ref())?
371 ))
372 }
373
374 async fn presigned_authorization(
375 &self,
376 custom_headers: Option<&HeaderMap>,
377 ) -> Result<String, S3Error> {
378 let mut headers = HeaderMap::new();
379 let host_header = self.host_header();
380 headers.insert(HOST, host_header.parse()?);
381 if let Some(custom_headers) = custom_headers {
382 for (k, v) in custom_headers.iter() {
383 headers.insert(k.clone(), v.clone());
384 }
385 }
386 let canonical_request = self.presigned_canonical_request(&headers).await?;
387 let string_to_sign = self.string_to_sign(&canonical_request)?;
388 let mut hmac = signing::HmacSha256::new_from_slice(&self.signing_key().await?)?;
389 hmac.update(string_to_sign.as_bytes());
390 let signature = hex::encode(hmac.finalize().into_bytes());
391 Ok(signature)
393 }
394
395 async fn presigned_canonical_request(&self, headers: &HeaderMap) -> Result<String, S3Error> {
396 let (expiry, custom_headers, custom_queries) = match self.command() {
397 Command::PresignGet {
398 expiry_secs,
399 custom_queries,
400 } => (expiry_secs, None, custom_queries),
401 Command::PresignPut {
402 expiry_secs,
403 custom_headers,
404 custom_queries,
405 } => (expiry_secs, custom_headers, custom_queries),
406 Command::PresignDelete { expiry_secs } => (expiry_secs, None, None),
407 _ => unreachable!(),
408 };
409
410 signing::canonical_request(
411 &self.command().http_verb().to_string(),
412 &self
413 .presigned_url_no_sig(expiry, custom_headers.as_ref(), custom_queries.as_ref())
414 .await?,
415 headers,
416 "UNSIGNED-PAYLOAD",
417 )
418 }
419
420 #[maybe_async::async_impl]
421 async fn presigned_url_no_sig(
422 &self,
423 expiry: u32,
424 custom_headers: Option<&HeaderMap>,
425 custom_queries: Option<&HashMap<String, String>>,
426 ) -> Result<Url, S3Error> {
427 let bucket = self.bucket();
428 let token = if let Some(security_token) = bucket.security_token().await? {
429 Some(security_token)
430 } else {
431 bucket.session_token().await?
432 };
433 let url = Url::parse(&format!(
434 "{}{}{}",
435 self.url()?,
436 &signing::authorization_query_params_no_sig(
437 &self.bucket().access_key().await?.unwrap_or_default(),
438 &self.datetime(),
439 &self.bucket().region(),
440 expiry,
441 custom_headers,
442 token.as_ref()
443 )?,
444 &signing::flatten_queries(custom_queries)?,
445 ))?;
446
447 Ok(url)
448 }
449
450 #[maybe_async::sync_impl]
451 fn presigned_url_no_sig(
452 &self,
453 expiry: u32,
454 custom_headers: Option<&HeaderMap>,
455 custom_queries: Option<&HashMap<String, String>>,
456 ) -> Result<Url, S3Error> {
457 let bucket = self.bucket();
458 let token = if let Some(security_token) = bucket.security_token()? {
459 Some(security_token)
460 } else {
461 bucket.session_token()?
462 };
463 let url = Url::parse(&format!(
464 "{}{}{}",
465 self.url()?,
466 &signing::authorization_query_params_no_sig(
467 &self.bucket().access_key()?.unwrap_or_default(),
468 &self.datetime(),
469 &self.bucket().region(),
470 expiry,
471 custom_headers,
472 token.as_ref()
473 )?,
474 &signing::flatten_queries(custom_queries)?,
475 ))?;
476
477 Ok(url)
478 }
479
480 fn url(&self) -> Result<Url, S3Error> {
481 let mut url_str = self.bucket().url();
482
483 if let Command::ListBuckets { .. } = self.command() {
484 return Ok(Url::parse(&url_str)?);
485 }
486
487 if let Command::CreateBucket { .. } = self.command() {
488 return Ok(Url::parse(&url_str)?);
489 }
490
491 let path = if self.path().starts_with('/') {
492 self.path()[1..].to_string()
493 } else {
494 self.path()[..].to_string()
495 };
496
497 url_str.push('/');
498 url_str.push_str(&signing::uri_encode(&path, false));
499
500 #[allow(clippy::collapsible_match)]
502 match self.command() {
503 Command::InitiateMultipartUpload { .. } | Command::ListMultipartUploads { .. } => {
504 url_str.push_str("?uploads")
505 }
506 Command::AbortMultipartUpload { upload_id } => {
507 write!(url_str, "?uploadId={}", upload_id).expect("Could not write to url_str");
508 }
509 Command::CompleteMultipartUpload { upload_id, .. } => {
510 write!(url_str, "?uploadId={}", upload_id).expect("Could not write to url_str");
511 }
512 Command::GetObjectTorrent => url_str.push_str("?torrent"),
513 Command::PutObject { multipart, .. } => {
514 if let Some(multipart) = multipart {
515 url_str.push_str(&multipart.query_string())
516 }
517 }
518 Command::GetBucketLifecycle
519 | Command::PutBucketLifecycle { .. }
520 | Command::DeleteBucketLifecycle => {
521 url_str.push_str("?lifecycle");
522 }
523 Command::GetBucketCors { .. }
524 | Command::PutBucketCors { .. }
525 | Command::DeleteBucketCors { .. } => {
526 url_str.push_str("?cors");
527 }
528 Command::GetObjectAttributes { version_id, .. } => {
529 if let Some(version_id) = version_id {
530 url_str.push_str(&format!("?attributes&versionId={}", version_id));
531 } else {
532 url_str.push_str("?attributes&versionId=null");
533 }
534 }
535 Command::HeadObject => {}
536 Command::DeleteObject => {}
537 Command::DeleteObjectTagging => {}
538 Command::GetObject => {}
539 Command::GetObjectRange { .. } => {}
540 Command::GetObjectTagging => {}
541 Command::ListObjects { .. } => {}
542 Command::ListObjectsV2 { .. } => {}
543 Command::GetBucketLocation => {}
544 Command::PresignGet { .. } => {}
545 Command::PresignPut { .. } => {}
546 Command::PresignDelete { .. } => {}
547 Command::DeleteBucket => {}
548 Command::ListBuckets => {}
549 Command::CopyObject { .. } => {}
550 Command::PutObjectTagging { .. } => {}
551 Command::UploadPart { .. } => {}
552 Command::CreateBucket { .. } => {}
553 }
554
555 let mut url = Url::parse(&url_str)?;
556
557 for (key, value) in &self.bucket().extra_query {
558 url.query_pairs_mut().append_pair(key, value);
559 }
560
561 if let Command::ListObjectsV2 {
562 prefix,
563 delimiter,
564 continuation_token,
565 start_after,
566 max_keys,
567 } = self.command().clone()
568 {
569 let mut query_pairs = url.query_pairs_mut();
570 delimiter.map(|d| query_pairs.append_pair("delimiter", &d));
571
572 query_pairs.append_pair("prefix", &prefix);
573 query_pairs.append_pair("list-type", "2");
574 if let Some(token) = continuation_token {
575 query_pairs.append_pair("continuation-token", &token);
576 }
577 if let Some(start_after) = start_after {
578 query_pairs.append_pair("start-after", &start_after);
579 }
580 if let Some(max_keys) = max_keys {
581 query_pairs.append_pair("max-keys", &max_keys.to_string());
582 }
583 }
584
585 if let Command::ListObjects {
586 prefix,
587 delimiter,
588 marker,
589 max_keys,
590 } = self.command().clone()
591 {
592 let mut query_pairs = url.query_pairs_mut();
593 delimiter.map(|d| query_pairs.append_pair("delimiter", &d));
594
595 query_pairs.append_pair("prefix", &prefix);
596 if let Some(marker) = marker {
597 query_pairs.append_pair("marker", &marker);
598 }
599 if let Some(max_keys) = max_keys {
600 query_pairs.append_pair("max-keys", &max_keys.to_string());
601 }
602 }
603
604 match self.command() {
605 Command::ListMultipartUploads {
606 prefix,
607 delimiter,
608 key_marker,
609 max_uploads,
610 } => {
611 let mut query_pairs = url.query_pairs_mut();
612 delimiter.map(|d| query_pairs.append_pair("delimiter", d));
613 if let Some(prefix) = prefix {
614 query_pairs.append_pair("prefix", prefix);
615 }
616 if let Some(key_marker) = key_marker {
617 query_pairs.append_pair("key-marker", &key_marker);
618 }
619 if let Some(max_uploads) = max_uploads {
620 query_pairs.append_pair("max-uploads", max_uploads.to_string().as_str());
621 }
622 }
623 Command::PutObjectTagging { .. }
624 | Command::GetObjectTagging
625 | Command::DeleteObjectTagging => {
626 url.query_pairs_mut().append_pair("tagging", "");
627 }
628 _ => {}
629 }
630
631 Ok(url)
632 }
633
634 fn canonical_request(&self, headers: &HeaderMap) -> Result<String, S3Error> {
635 signing::canonical_request(
636 &self.command().http_verb().to_string(),
637 &self.url()?,
638 headers,
639 &self.command().sha256()?,
640 )
641 }
642
643 #[maybe_async::maybe_async]
644 async fn authorization(&self, headers: &HeaderMap) -> Result<String, S3Error> {
645 let canonical_request = self.canonical_request(headers)?;
646 let string_to_sign = self.string_to_sign(&canonical_request)?;
647 let mut hmac = signing::HmacSha256::new_from_slice(&self.signing_key().await?)?;
648 hmac.update(string_to_sign.as_bytes());
649 let signature = hex::encode(hmac.finalize().into_bytes());
650 let signed_header = signing::signed_header_string(headers);
651 signing::authorization_header(
652 &self
653 .bucket()
654 .access_key()
655 .await?
656 .expect("No access_key provided"),
657 &self.datetime(),
658 &self.bucket().region(),
659 &signed_header,
660 &signature,
661 )
662 }
663
664 #[maybe_async::maybe_async]
665 async fn headers(&self) -> Result<HeaderMap, S3Error> {
666 let sha256 = self.command().sha256()?;
668
669 let mut headers = HeaderMap::new();
673
674 for (k, v) in self.bucket().extra_headers.iter() {
675 if k.as_str().starts_with("x-amz-meta-") {
676 match self.command() {
678 Command::UploadPart { .. }
679 | Command::AbortMultipartUpload { .. }
680 | Command::CompleteMultipartUpload { .. }
681 | Command::PutObject {
682 multipart: Some(_), ..
683 } => continue,
684 _ => (),
685 }
686 }
687 headers.insert(k.clone(), v.clone());
688 }
689
690 if let Command::PutObject { custom_headers, .. } = self.command()
692 && let Some(custom_headers) = custom_headers
693 {
694 for (k, v) in custom_headers.iter() {
695 headers.insert(k.clone(), v.clone());
696 }
697 }
698
699 let host_header = self.host_header();
700
701 headers.insert(HOST, host_header.parse()?);
702
703 match self.command() {
704 Command::CopyObject { from } => {
705 headers.insert(HeaderName::from_static("x-amz-copy-source"), from.parse()?);
706 }
707 Command::ListObjects { .. } => {}
708 Command::ListObjectsV2 { .. } => {}
709 Command::GetObject => {}
710 Command::GetObjectTagging => {}
711 Command::GetBucketLocation => {}
712 Command::ListBuckets => {}
713 _ => {
714 headers.insert(
715 CONTENT_LENGTH,
716 self.command().content_length()?.to_string().parse()?,
717 );
718 headers.insert(CONTENT_TYPE, self.command().content_type().parse()?);
719 }
720 }
721 headers.insert(
722 HeaderName::from_static("x-amz-content-sha256"),
723 sha256.parse()?,
724 );
725 headers.insert(
726 HeaderName::from_static("x-amz-date"),
727 self.long_date()?.parse()?,
728 );
729
730 if let Some(session_token) = self.bucket().session_token().await? {
731 headers.insert(
732 HeaderName::from_static("x-amz-security-token"),
733 session_token.parse()?,
734 );
735 } else if let Some(security_token) = self.bucket().security_token().await? {
736 headers.insert(
737 HeaderName::from_static("x-amz-security-token"),
738 security_token.parse()?,
739 );
740 }
741
742 if let Command::PutObjectTagging { tags } = self.command() {
743 let digest = md5::compute(tags);
744 let hash = general_purpose::STANDARD.encode(digest.as_ref());
745 headers.insert(HeaderName::from_static("content-md5"), hash.parse()?);
746 } else if let Command::PutObject { content, .. } = self.command() {
747 let digest = md5::compute(content);
748 let hash = general_purpose::STANDARD.encode(digest.as_ref());
749 headers.insert(HeaderName::from_static("content-md5"), hash.parse()?);
750 } else if let Command::UploadPart { content, .. } = self.command() {
751 let digest = md5::compute(content);
752 let hash = general_purpose::STANDARD.encode(digest.as_ref());
753 headers.insert(HeaderName::from_static("content-md5"), hash.parse()?);
754 } else if let Command::GetObject {} = self.command() {
755 headers.insert(ACCEPT, "application/octet-stream".to_string().parse()?);
756 } else if let Command::GetObjectRange { start, end } = self.command() {
758 headers.insert(ACCEPT, "application/octet-stream".to_string().parse()?);
759
760 let mut range = format!("bytes={}-", start);
761
762 if let Some(end) = end {
763 range.push_str(&end.to_string());
764 }
765
766 headers.insert(RANGE, range.parse()?);
767 } else if let Command::CreateBucket { ref config } = self.command() {
768 config.add_headers(&mut headers)?;
769 } else if let Command::PutBucketLifecycle { ref configuration } = self.command() {
770 let digest = md5::compute(to_string(configuration)?.as_bytes());
771 let hash = general_purpose::STANDARD.encode(digest.as_ref());
772 headers.insert(HeaderName::from_static("content-md5"), hash.parse()?);
773 } else if let Command::PutBucketCors {
774 expected_bucket_owner,
775 configuration,
776 ..
777 } = self.command()
778 {
779 let digest = md5::compute(configuration.to_string().as_bytes());
780 let hash = general_purpose::STANDARD.encode(digest.as_ref());
781 headers.insert(HeaderName::from_static("content-md5"), hash.parse()?);
782
783 headers.insert(
784 HeaderName::from_static("x-amz-expected-bucket-owner"),
785 expected_bucket_owner.parse()?,
786 );
787 } else if let Command::GetBucketCors {
788 expected_bucket_owner,
789 } = self.command()
790 {
791 headers.insert(
792 HeaderName::from_static("x-amz-expected-bucket-owner"),
793 expected_bucket_owner.parse()?,
794 );
795 } else if let Command::DeleteBucketCors {
796 expected_bucket_owner,
797 } = self.command()
798 {
799 headers.insert(
800 HeaderName::from_static("x-amz-expected-bucket-owner"),
801 expected_bucket_owner.parse()?,
802 );
803 } else if let Command::GetObjectAttributes {
804 expected_bucket_owner,
805 ..
806 } = self.command()
807 {
808 headers.insert(
809 HeaderName::from_static("x-amz-expected-bucket-owner"),
810 expected_bucket_owner.parse()?,
811 );
812 headers.insert(
813 HeaderName::from_static("x-amz-object-attributes"),
814 "ETag".parse()?,
815 );
816 }
817
818 if self.bucket().secret_key().await?.is_some() {
820 let authorization = self.authorization(&headers).await?;
821 headers.insert(AUTHORIZATION, authorization.parse()?);
822 }
823
824 headers.insert(DATE, self.datetime().format(&Rfc2822)?.parse()?);
831
832 Ok(headers)
833 }
834}
835
836#[cfg(all(test, feature = "with-tokio"))]
837mod tests {
838 use super::*;
839 use bytes::Bytes;
840 use futures_util::stream;
841 use tokio::io::AsyncReadExt;
842
843 #[tokio::test]
844 async fn test_async_read_implementation() {
845 let chunks = vec![
847 Ok(Bytes::from("Hello, ")),
848 Ok(Bytes::from("World!")),
849 Ok(Bytes::from(" This is a test.")),
850 ];
851
852 let stream = stream::iter(chunks);
853 let data_stream: DataStream = Box::pin(stream);
854
855 let mut response_stream = ResponseDataStream {
856 bytes: data_stream,
857 status_code: 200,
858 };
859
860 let mut buffer = Vec::new();
862 response_stream.read_to_end(&mut buffer).await.unwrap();
863
864 assert_eq!(buffer, b"Hello, World! This is a test.");
865 }
866
867 #[tokio::test]
868 async fn test_async_read_with_small_buffer() {
869 let chunks = vec![Ok(Bytes::from(
871 "This is a much longer string that won't fit in a small buffer",
872 ))];
873
874 let stream = stream::iter(chunks);
875 let data_stream: DataStream = Box::pin(stream);
876
877 let mut response_stream = ResponseDataStream {
878 bytes: data_stream,
879 status_code: 200,
880 };
881
882 let mut buffer = [0u8; 10];
884 let n = response_stream.read(&mut buffer).await.unwrap();
885
886 assert_eq!(n, 10);
888 assert_eq!(&buffer[..n], b"This is a ");
889
890 let n = response_stream.read(&mut buffer).await.unwrap();
892 assert_eq!(n, 0);
893 }
894
895 #[tokio::test]
896 async fn test_async_read_with_error() {
897 use crate::error::S3Error;
898
899 let chunks: Vec<Result<Bytes, S3Error>> = vec![
901 Ok(Bytes::from("Some data")),
902 Err(S3Error::Io(std::io::Error::new(
903 std::io::ErrorKind::Other,
904 "Test error",
905 ))),
906 ];
907
908 let stream = stream::iter(chunks);
909 let data_stream: DataStream = Box::pin(stream);
910
911 let mut response_stream = ResponseDataStream {
912 bytes: data_stream,
913 status_code: 200,
914 };
915
916 let mut buffer = [0u8; 20];
918 let n = response_stream.read(&mut buffer).await.unwrap();
919 assert_eq!(n, 9);
920 assert_eq!(&buffer[..n], b"Some data");
921
922 let result = response_stream.read(&mut buffer).await;
924 assert!(result.is_err());
925 }
926
927 #[tokio::test]
928 async fn test_async_read_copy() {
929 let chunks = vec![
931 Ok(Bytes::from("First chunk\n")),
932 Ok(Bytes::from("Second chunk\n")),
933 Ok(Bytes::from("Third chunk\n")),
934 ];
935
936 let stream = stream::iter(chunks);
937 let data_stream: DataStream = Box::pin(stream);
938
939 let mut response_stream = ResponseDataStream {
940 bytes: data_stream,
941 status_code: 200,
942 };
943
944 let mut output = Vec::new();
945 tokio::io::copy(&mut response_stream, &mut output)
946 .await
947 .unwrap();
948
949 assert_eq!(output, b"First chunk\nSecond chunk\nThird chunk\n");
950 }
951}
952
953#[cfg(all(test, feature = "with-async-std"))]
954mod async_std_tests {
955 use super::*;
956 use async_std::io::ReadExt;
957 use bytes::Bytes;
958 use futures_util::stream;
959
960 #[async_std::test]
961 async fn test_async_read_implementation() {
962 let chunks = vec![
964 Ok(Bytes::from("Hello, ")),
965 Ok(Bytes::from("World!")),
966 Ok(Bytes::from(" This is a test.")),
967 ];
968
969 let stream = stream::iter(chunks);
970 let data_stream: DataStream = Box::pin(stream);
971
972 let mut response_stream = ResponseDataStream {
973 bytes: data_stream,
974 status_code: 200,
975 };
976
977 let mut buffer = Vec::new();
979 response_stream.read_to_end(&mut buffer).await.unwrap();
980
981 assert_eq!(buffer, b"Hello, World! This is a test.");
982 }
983
984 #[async_std::test]
985 async fn test_async_read_with_small_buffer() {
986 let chunks = vec![Ok(Bytes::from(
988 "This is a much longer string that won't fit in a small buffer",
989 ))];
990
991 let stream = stream::iter(chunks);
992 let data_stream: DataStream = Box::pin(stream);
993
994 let mut response_stream = ResponseDataStream {
995 bytes: data_stream,
996 status_code: 200,
997 };
998
999 let mut buffer = [0u8; 10];
1001 let n = response_stream.read(&mut buffer).await.unwrap();
1002
1003 assert_eq!(n, 10);
1005 assert_eq!(&buffer[..n], b"This is a ");
1006
1007 let n = response_stream.read(&mut buffer).await.unwrap();
1009 assert_eq!(n, 0);
1010 }
1011
1012 #[async_std::test]
1013 async fn test_async_read_with_error() {
1014 use crate::error::S3Error;
1015
1016 let chunks: Vec<Result<Bytes, S3Error>> = vec![
1018 Ok(Bytes::from("Some data")),
1019 Err(S3Error::Io(std::io::Error::new(
1020 std::io::ErrorKind::Other,
1021 "Test error",
1022 ))),
1023 ];
1024
1025 let stream = stream::iter(chunks);
1026 let data_stream: DataStream = Box::pin(stream);
1027
1028 let mut response_stream = ResponseDataStream {
1029 bytes: data_stream,
1030 status_code: 200,
1031 };
1032
1033 let mut buffer = [0u8; 20];
1035 let n = response_stream.read(&mut buffer).await.unwrap();
1036 assert_eq!(n, 9);
1037 assert_eq!(&buffer[..n], b"Some data");
1038
1039 let result = response_stream.read(&mut buffer).await;
1041 assert!(result.is_err());
1042 }
1043
1044 #[async_std::test]
1045 async fn test_async_read_copy() {
1046 let chunks = vec![
1048 Ok(Bytes::from("First chunk\n")),
1049 Ok(Bytes::from("Second chunk\n")),
1050 Ok(Bytes::from("Third chunk\n")),
1051 ];
1052
1053 let stream = stream::iter(chunks);
1054 let data_stream: DataStream = Box::pin(stream);
1055
1056 let mut response_stream = ResponseDataStream {
1057 bytes: data_stream,
1058 status_code: 200,
1059 };
1060
1061 let mut output = Vec::new();
1062 async_std::io::copy(&mut response_stream, &mut output)
1063 .await
1064 .unwrap();
1065
1066 assert_eq!(output, b"First chunk\nSecond chunk\nThird chunk\n");
1067 }
1068}