1use rusoto_s3::{S3Client, HeadObjectOutput};
44use rusoto_s3::{DeleteObjectRequest, DeleteObjectsRequest, Delete, ObjectIdentifier, HeadObjectRequest, HeadObjectError, HeadBucketRequest, HeadBucketError};
45pub use rusoto_core::region::Region;
46use rusoto_core::RusotoError;
47use rusoto_core::request::BufferedHttpResponse;
48use rusoto_s3::S3 as S3Trait;
49use rusoto_s3::{ListObjectsV2Request, ListObjectsV2Output};
50use rusoto_s3::Object as S3Object;
51use log::{trace, debug, error};
52use itertools::unfold;
53use std::borrow::Borrow;
54use std::time::Duration;
55use std::io::Read;
56use std::error::Error;
57use std::fmt;
58use std::collections::{HashMap, HashSet};
59use std::cell::RefCell;
60use ensure::{Absent, Present, Ensure, Meet, External};
61use ensure::CheckEnsureResult::*;
62use either::Either;
63use Either::*;
64use problem::prelude::*;
65use itertools::Itertools;
66#[cfg(feature = "chrono")]
67use chrono::{DateTime, FixedOffset};
68
69pub use ensure;
71pub use either;
72
73pub trait Captures1<'i> {}
74impl<'i, T> Captures1<'i> for T {}
75
76pub trait Captures2<'i> {}
77impl<'i, T> Captures2<'i> for T {}
78
79#[derive(Debug)]
81pub enum S3SyncError {
82 RusotoError(RusotoError<Box<dyn Error + 'static>>),
83 IoError(std::io::Error),
84 NoBodyError,
85 MissingObjectMetaData(&'static str),
86 #[cfg(feature = "chrono")]
87 ChronoError(chrono::ParseError),
88}
89
90impl fmt::Display for S3SyncError {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self {
93 S3SyncError::RusotoError(err) => match err {
94 RusotoError::Service(err) => write!(f, "AWS service-specific error occurred: {}", err),
95 RusotoError::HttpDispatch(err) => write!(f, "AWS error occurred dispatching the HTTP request: {}", err),
96 RusotoError::Credentials(err) => write!(f, "AWS error was encountered with credentials: {}", err),
97 RusotoError::Validation(err) => write!(f, "AWS validation error occurred: {}", err),
98 RusotoError::ParseError(err) => write!(f, "AWS error occurred parsing the response payload: {}", err),
99 RusotoError::Unknown(err @ BufferedHttpResponse { status, .. }) =>
100 if let Some(reason) = status.canonical_reason() {
101 if err.body.is_empty() {
102 write!(f, "AWS HTTP error occurred: {}", reason)
103 } else {
104 write!(f, "AWS HTTP error occurred: {}: {}", reason, err.body_as_str())
105 }
106 } else {
107 write!(f, "unknown AWS HTTP error occurred: {:?}", err)
108 }
109 },
110 S3SyncError::IoError(_) => write!(f, "local I/O error"),
111 S3SyncError::NoBodyError => write!(f, "expected body but found none"),
112 S3SyncError::MissingObjectMetaData(meta) => write!(f, "expected object to have {} value but found none", meta),
113 #[cfg(feature = "chrono")]
114 S3SyncError::ChronoError(_) => write!(f, "error parsing timestamp"),
115 }
116 }
117}
118
119impl Error for S3SyncError {
120 fn source(&self) -> Option<&(dyn Error + 'static)> {
121 match self {
122 S3SyncError::RusotoError(_) => None,
123 S3SyncError::IoError(err) => Some(err),
124 S3SyncError::NoBodyError => None,
125 S3SyncError::MissingObjectMetaData(_) => None,
126 #[cfg(feature = "chrono")]
127 S3SyncError::ChronoError(err) => Some(err),
128 }
129 }
130}
131
132impl<T: Error + 'static> From<RusotoError<T>> for S3SyncError {
133 fn from(err: RusotoError<T>) -> S3SyncError {
134 match err {
135 RusotoError::Service(err) => S3SyncError::RusotoError(RusotoError::Service(Box::new(err))),
136 RusotoError::HttpDispatch(err) => S3SyncError::RusotoError(RusotoError::HttpDispatch(err)),
137 RusotoError::Credentials(err) => S3SyncError::RusotoError(RusotoError::Credentials(err)),
138 RusotoError::Validation(err) => S3SyncError::RusotoError(RusotoError::Validation(err)),
139 RusotoError::ParseError(err) => S3SyncError::RusotoError(RusotoError::ParseError(err)),
140 RusotoError::Unknown(err) => S3SyncError::RusotoError(RusotoError::Unknown(err)),
141 }
142 }
143}
144
145impl From<std::io::Error> for S3SyncError {
146 fn from(err: std::io::Error) -> S3SyncError {
147 S3SyncError::IoError(err)
148 }
149}
150
151#[cfg(feature = "chrono")]
152impl From<chrono::ParseError> for S3SyncError {
153 fn from(err: chrono::ParseError) -> S3SyncError {
154 S3SyncError::ChronoError(err)
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
160pub struct Bucket {
161 name: String
162}
163
164impl External for Bucket {}
165
166impl Bucket {
167 pub fn from_name(name: impl Into<String>) -> Bucket {
169 Bucket {
170 name: name.into()
171 }
172 }
173
174 pub fn name(&self) -> &str {
176 &self.name
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct BucketKey<'b> {
183 pub bucket: &'b Present<Bucket>,
184 pub key: String,
185}
186
187impl External for BucketKey<'_> {}
188
189impl<'b> BucketKey<'b> {
190 pub fn from_key(bucket: &Present<Bucket>, key: impl Into<String>) -> BucketKey {
192 BucketKey {
193 bucket,
194 key: key.into(),
195 }
196 }
197
198 pub fn bucket_name(&self) -> &str {
200 self.bucket.name()
201 }
202
203 pub fn key(&self) -> &str {
205 &self.key
206 }
207
208 pub fn bucket(&self) -> &Present<Bucket> {
210 self.bucket
211 }
212
213 pub fn assume_present(self) -> Present<BucketKey<'b>> {
215 Present(self)
216 }
217}
218
219impl<'b> From<Object<'b>> for BucketKey<'b> {
220 fn from(obj: Object<'b>) -> BucketKey<'b> {
221 obj.bucket_key.0
222 }
223}
224
225impl<'b> From<Object<'b>> for Present<BucketKey<'b>> {
226 fn from(obj: Object<'b>) -> Present<BucketKey<'b>> {
227 obj.bucket_key
228 }
229}
230
231impl<'b> Borrow<Present<BucketKey<'b>>> for Object<'b> {
232 fn borrow(&self) -> &Present<BucketKey<'b>> {
233 self.bucket_key()
234 }
235}
236
237impl<'b> From<Present<BucketKey<'b>>> for BucketKey<'b> {
238 fn from(obj: Present<BucketKey<'b>>) -> BucketKey<'b> {
239 obj.0
240 }
241}
242
243impl<'b> From<Absent<BucketKey<'b>>> for BucketKey<'b> {
244 fn from(obj: Absent<BucketKey<'b>>) -> BucketKey<'b> {
245 obj.0
246 }
247}
248
249#[derive(Debug, PartialEq, Eq)]
251pub struct Object<'b> {
252 pub bucket_key: Present<BucketKey<'b>>,
253 pub size: i64,
254 pub e_tag: String,
255 pub last_modified: LastModified,
256}
257
258impl<'b> Object<'b> {
259 fn from_head(
260 bucket_key: BucketKey<'b>,
261 res: HeadObjectOutput,
262 ) -> Result<Object, S3SyncError> {
263 Ok(Object {
264 bucket_key: Present(bucket_key),
265 e_tag: res.e_tag.ok_or(S3SyncError::MissingObjectMetaData("e_tag"))?,
266 last_modified: res.last_modified.map(LastModified::Rfc2822)
267 .ok_or(S3SyncError::MissingObjectMetaData("last_modified"))?,
268 size: res.content_length.ok_or(S3SyncError::MissingObjectMetaData("content_length"))?,
269 })
270 }
271
272 fn from_s3_object(bucket: &Present<Bucket>, object: S3Object) -> Result<Object, S3SyncError> {
273 Ok(Object {
274 bucket_key: Present(BucketKey::from_key(bucket, object.key
275 .ok_or(S3SyncError::MissingObjectMetaData("key"))?)),
276 e_tag: object.e_tag.ok_or(S3SyncError::MissingObjectMetaData("e_tag"))?,
277 last_modified: object.last_modified.map(LastModified::Rfc3339)
278 .ok_or(S3SyncError::MissingObjectMetaData("last_modified"))?,
279 size: object.size.ok_or(S3SyncError::MissingObjectMetaData("size"))?,
280 })
281 }
282
283 pub fn bucket_key(&self) -> &Present<BucketKey<'b>> {
285 &self.bucket_key
286 }
287
288 pub fn unwrap_bucket_key(self) -> Present<BucketKey<'b>> {
290 self.bucket_key
291 }
292
293 pub fn size(&self) -> i64 {
295 self.size
296 }
297
298 pub fn e_tag(&self) -> &str {
300 &self.e_tag
301 }
302
303 pub fn last_modified(&self) -> &LastModified {
305 &self.last_modified
306 }
307}
308
309pub trait PresentBucketKeyName {
311 fn bucket_name(&self) -> &str;
313 fn key(&self) -> &str;
315}
316
317impl<'b> PresentBucketKeyName for Present<BucketKey<'b>> {
318 fn bucket_name(&self) -> &str {
319 self.bucket.name()
320 }
321
322 fn key(&self) -> &str {
323 &self.key
324 }
325}
326
327impl<'b> PresentBucketKeyName for Object<'b> {
328 fn bucket_name(&self) -> &str {
329 self.bucket_key.bucket_name()
330 }
331
332 fn key(&self) -> &str {
333 self.bucket_key.key()
334 }
335}
336
337#[derive(Debug, PartialEq, Eq)]
341pub enum LastModified {
342 Rfc2822(String),
344 Rfc3339(String),
346}
347
348impl fmt::Display for LastModified {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 write!(f, "{}", self.as_str())
351 }
352}
353
354impl LastModified {
355 pub fn as_str(&self) -> &str {
357 match self {
358 LastModified::Rfc2822(dt) => &dt,
359 LastModified::Rfc3339(dt) => &dt,
360 }
361 }
362
363 #[cfg(feature = "chrono")]
364 pub fn parse(&self) -> Result<DateTime<FixedOffset>, S3SyncError> {
366 Ok(match &self {
367 LastModified::Rfc2822(lm) => DateTime::parse_from_rfc2822(lm),
368 LastModified::Rfc3339(lm) => DateTime::parse_from_rfc3339(lm),
369 }?)
370 }
371}
372
373struct PaginationIter<RQ, RS, SSA, GSA, FF, E>
374where RQ: Clone, SSA: Fn(&mut RQ, String), GSA: Fn(&RS) -> Option<String>, FF: Fn(RQ) -> Result<RS, E> {
375 request: RQ,
376 set_start_after: SSA,
378 get_start_after: GSA,
379 fetch: FF,
380 done: bool
381}
382
383impl<RQ, RS, SSA, GSA, FF, E> Iterator for PaginationIter<RQ, RS, SSA, GSA, FF, E>
384where RQ: Clone, SSA: Fn(&mut RQ, String), GSA: Fn(&RS) -> Option<String>, FF: Fn(RQ) -> Result<RS, E> {
385 type Item = Result<RS, E>;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 if self.done {
389 return None
390 }
391 Some((self.fetch)(self.request.clone()).map(|response| {
392 if let Some(start_after) = (self.get_start_after)(&response) {
393 (self.set_start_after)(&mut self.request, start_after);
394 } else {
395 self.done = true;
396 }
397 response
398 }))
399 }
400}
401
402#[derive(Debug)]
404pub enum TransferStatus {
405 Init,
407 Progress(TransferStats),
409 Done(TransferStats),
411 Failed(String),
413}
414
415impl Default for TransferStatus {
416 fn default() -> Self {
417 TransferStatus::Init
418 }
419}
420
421impl TransferStatus {
422 fn update(&mut self, stats: TransferStats) {
423 match self {
424 TransferStatus::Init => {
425 *self = TransferStatus::Progress(stats);
426 }
427 TransferStatus::Progress(ref mut s) => {
428 s.buffers += stats.buffers;
429 s.bytes = stats.bytes;
430 s.bytes_total += stats.bytes_total;
431 },
432 _ => panic!("TransferStats in bad state for .done(): {:?}", self),
433 }
434 }
435
436 fn done(self) -> Self {
437 match self {
438 TransferStatus::Progress(stats) => TransferStatus::Done(stats),
439 _ => panic!("TransferStats in bad state for .done(): {:?}", self),
440 }
441 }
442
443 fn failed(self, err: String) -> Self {
444 match self {
445 TransferStatus::Init |
446 TransferStatus::Progress(_) => TransferStatus::Failed(err),
447 _ => panic!("TransferStats in bad state for .failed(): {:?}", self),
448 }
449 }
450}
451
452#[derive(Debug, Default)]
454pub struct TransferStats {
455 pub buffers: u16,
457 pub bytes: u64,
459 pub bytes_total: u64,
461}
462
463#[derive(Debug)]
465pub struct ObjectBodyMeta {
466 pub content_type: String,
468 pub content_disposition: Option<String>,
470 pub content_language: Option<String>,
472}
473
474impl Default for ObjectBodyMeta {
475 fn default() -> ObjectBodyMeta {
476 ObjectBodyMeta {
477 content_type: "application/octet-stream".to_owned(),
478 content_disposition: None,
479 content_language: None,
480 }
481 }
482}
483
484#[derive(Debug)]
486pub enum CheckObjectImpl {
487 Head,
489 List,
491}
492
493#[derive(Debug)]
494pub struct Settings {
495 pub part_size: usize,
499 pub timeout: Duration,
501 pub data_timeout: Duration,
503 pub max_multipart_upload_parts: usize,
505 pub max_delete_objects: usize,
507}
508
509impl Default for Settings {
510 fn default() -> Settings {
511 Settings {
512 part_size: 10 * 1024 * 1024, timeout: Duration::from_secs(10),
514 data_timeout: Duration::from_secs(300),
515 max_multipart_upload_parts: 10_000, max_delete_objects: 1_000, }
518 }
519}
520
521pub struct S3 {
524 client: S3Client,
525 on_upload_progress: Option<RefCell<Box<dyn FnMut(&TransferStatus)>>>,
526 part_size: usize,
527 timeout: Duration,
528 data_timeout: Duration,
529 max_multipart_upload_parts: usize,
530 max_delete_objects: usize,
531}
532
533impl fmt::Debug for S3 {
534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535 f.debug_struct("S3")
536 .field("part_size", &self.part_size)
537 .field("timeout", &self.timeout)
538 .field("data_timeout", &self.data_timeout)
539 .finish()
540 }
541}
542
543impl Default for S3 {
544 fn default() -> S3 {
545 S3::new(None, None, None)
546 }
547}
548
549impl S3 {
550 pub fn new(
556 region: impl Into<Option<Region>>,
557 region_endpoint: impl Into<Option<String>>,
558 settings: impl Into<Option<Settings>>
559 ) -> S3 {
560 let region = match (region.into(), region_endpoint.into()) {
561 (Some(region), Some(endpoint)) => Region::Custom { name: region.name().to_owned(), endpoint },
562 (None, Some(endpoint)) => Region::Custom { name: Region::default().name().to_owned(), endpoint },
563 (Some(region), None) => region,
564 _ => Region::default(),
565 };
566 let settings = settings.into().unwrap_or_default();
567
568 S3 {
569 client: S3Client::new(region),
570 on_upload_progress: None,
571 part_size: settings.part_size,
572 timeout: settings.timeout,
573 data_timeout: settings.data_timeout,
574 max_multipart_upload_parts: settings.max_multipart_upload_parts,
575 max_delete_objects: settings.max_delete_objects,
576 }
577 }
578
579 pub fn with_region(region: Region) -> S3 {
581 S3::new(region, None, None)
582 }
583
584 pub fn part_size(&self) -> usize {
588 self.part_size
589 }
590
591 pub fn max_upload_size(&self) -> usize {
593 self.part_size * self.max_multipart_upload_parts
594 }
595
596 pub fn on_upload_progress(
598 &mut self,
599 callback: impl FnMut(&TransferStatus) + 'static
600 ) -> Option<Box<dyn FnMut(&TransferStatus)>> {
601 let ret = self.on_upload_progress.take();
602 self.on_upload_progress = Some(RefCell::new(Box::new(callback)));
603 ret.map(|c| c.into_inner())
604 }
605
606 pub fn with_on_upload_progress<O>(
609 &mut self,
610 callback: impl FnMut(&TransferStatus) + 'static,
611 f: impl FnOnce(&mut Self) -> O
612 ) -> O {
613 let old = self.on_upload_progress(callback);
614 let ret = f(self);
615 old.map(|callback| self.on_upload_progress(callback));
616 ret
617 }
618
619 fn notify_upload_progress(&self, status: &TransferStatus) {
620 self.on_upload_progress.as_ref().map(|c| {
621 c.try_borrow_mut().expect("S3 upload_progress closure already borrowed mutable").as_mut()(status);
622 });
623 }
624
625 pub fn check_bucket_exists(&self, bucket: Bucket) -> Result<Either<Present<Bucket>, Absent<Bucket>>, S3SyncError> {
627 let res = self.client.head_bucket(HeadBucketRequest {
628 bucket: bucket.name.clone(),
629 .. Default::default()
630 }).with_timeout(self.timeout).sync();
631 trace!("Head bucket response: {:?}", res);
632
633 match res {
634 Ok(_) => Ok(Left(Present(bucket))),
635 Err(RusotoError::Service(HeadBucketError::NoSuchBucket(_))) => Ok(Right(Absent(bucket))),
636 Err(RusotoError::Unknown(BufferedHttpResponse { status, .. })) if status.as_u16() == 404 => Ok(Right(Absent(bucket))),
637 Err(err) => Err(err.into())
638 }
639 }
640
641 pub fn check_object_exists<'s, 'b>(&'s self, bucket_key: BucketKey<'b>, implementation: CheckObjectImpl)
649 -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
650 match implementation {
651 CheckObjectImpl::List => self.check_object_exists_list(bucket_key),
652 CheckObjectImpl::Head => self.check_object_exists_head(bucket_key),
653 }
654 }
655
656 pub fn check_object_exists_head<'s, 'b>(&'s self, bucket_key: BucketKey<'b>)
663 -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
664 let res = self.client.head_object(HeadObjectRequest {
665 bucket: bucket_key.bucket.name.clone(),
666 key: bucket_key.key.clone(),
667 .. Default::default()
668 }).with_timeout(self.timeout).sync();
669 trace!("Head response: {:?}", res);
670
671 match res {
672 Ok(res) => Ok(Left(Object::from_head(bucket_key, res)?)),
673 Err(RusotoError::Service(HeadObjectError::NoSuchKey(_))) =>
674 Ok(Right(Absent(bucket_key))),
675 Err(RusotoError::Unknown(BufferedHttpResponse { status, .. })) if status.as_u16() == 404 =>
676 Ok(Right(Absent(bucket_key))),
677 Err(err) => Err(err.into())
678 }
679 }
680
681 pub fn check_object_exists_list<'s, 'b>(&'s self, bucket_key: BucketKey<'b>)
688 -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
689 let request = ListObjectsV2Request {
690 bucket: bucket_key.bucket.name().to_owned(),
691 prefix: Some(bucket_key.key.clone()),
692 max_keys: Some(1),
693 .. Default::default()
694 };
695
696 let res = self.client.list_objects_v2(request).with_timeout(self.timeout).sync()?;
697 let first_obj = res.contents.
698 and_then(|list| list.into_iter().next());
699 match first_obj {
700 Some(obj) if obj.key.as_deref().expect("S3 object has no key!") == bucket_key.key =>
701 Ok(Left(Object::from_s3_object(bucket_key.bucket, obj)?)),
702 _ => Ok(Right(Absent(bucket_key)))
703 }
704 }
705
706 pub fn list_objects<'b, 's: 'b>(&'s self, bucket: &'b Present<Bucket>, prefix: String)
713 -> impl Iterator<Item = Result<Object<'b>, S3SyncError>> + Captures1<'s> + Captures2<'b> {
714 let client = &self.client;
715 let pages = PaginationIter {
716 request: ListObjectsV2Request {
717 bucket: bucket.name().to_owned(),
718 prefix: Some(prefix),
719 .. Default::default()
720 },
721 set_start_after: |request: &mut ListObjectsV2Request, start_after| {
722 request.start_after = Some(start_after);
723 },
724 get_start_after: |response: &ListObjectsV2Output| {
725 response.contents.as_ref()
726 .and_then(|objects| objects.last().and_then(|last| last.key.as_ref().map(|r| r.clone())))
727 },
728 fetch: move |request: ListObjectsV2Request| {
729 client.list_objects_v2(request).with_timeout(self.timeout).sync().map_err(Into::into)
730 },
731 done: false
732 };
733
734 pages.flat_map(move |response| {
735 let mut error = None;
736 let mut objects = None;
737 match response {
738 Err(err) => error = Some(err),
739 Ok(output) => objects = output.contents.map(|objects| objects.into_iter()),
740 }
741
742 unfold((), move |_| {
743 if let Some(error) = error.take() {
744 Some(Err(error))
745 } else {
746 objects.as_mut().and_then(|obj| obj.next()).map(|o| Ok(Object::from_s3_object(bucket, o)?))
747 }
748 })
749 })
750 }
751
752 fn _get_body(&self, bucket: String, key: String) -> Result<impl Read + Send + '_, S3SyncError> {
753 use rusoto_s3::GetObjectRequest;
754 let req = GetObjectRequest {
755 bucket,
756 key,
757 .. Default::default()
758 };
759 self.client.get_object(req)
760 .with_timeout(self.data_timeout)
761 .sync()
762 .map_err(Into::into)
763 .and_then(|output| output.body.ok_or(S3SyncError::NoBodyError))
764 .map(|body| body.into_blocking_read())
765 }
766
767 pub fn get_body(&self, bucket_key: &impl PresentBucketKeyName) -> Result<impl Read + Send + '_, S3SyncError> {
769 let bucket = bucket_key.bucket_name().to_owned();
770 let key = bucket_key.key().to_owned();
771 self._get_body(bucket, key)
772 }
773
774 pub fn get_body_box(&self, bucket_key: &impl PresentBucketKeyName) -> Result<Box<dyn Read + Send + '_>, S3SyncError> {
778 let bucket = bucket_key.bucket_name().to_owned();
779 let key = bucket_key.key().to_owned();
780 self._get_body(bucket, key).map(|r| Box::new(r) as Box<dyn Read + Send>)
781 }
782
783 pub fn get_body_with_retry<'s, F>(
795 &'s self, bucket_key: &impl PresentBucketKeyName,
796 mut retries: u32,
797 on_error: F
798 ) -> Result<impl Read + Send + 's, S3SyncError>
799 where F: Fn(u32, &S3SyncError) -> bool {
800 let bucket = bucket_key.bucket_name().to_owned();
801 let key = bucket_key.key().to_owned();
802
803 loop {
804 match self._get_body(bucket.clone(), key.clone()) {
805 Ok(body) => return Ok(body),
806 Err(err) => {
807 if retries == 0 || !on_error(retries, &err) {
808 return Err(err)
809 }
810 retries -= 1;
811 }
812 }
813 }
814 }
815
816 pub fn put_object<'s, 'b>(
824 &'s self,
825 bucket_key: impl Into<BucketKey<'b>>,
826 mut body: impl Read,
827 meta: ObjectBodyMeta
828 ) -> Result<Present<BucketKey<'b>>, S3SyncError> {
829 use rusoto_s3::{CreateMultipartUploadRequest, UploadPartRequest, CompleteMultipartUploadRequest, AbortMultipartUploadRequest, CompletedMultipartUpload, CompletedPart};
830 use rusoto_core::ByteStream;
831
832 let bucket_key = bucket_key.into();
833 let bucket_name = bucket_key.bucket.name.clone();
834 let object_key = bucket_key.key.clone();
835
836 let upload_id = self.client.create_multipart_upload(CreateMultipartUploadRequest {
837 bucket: bucket_name.clone(),
838 key: object_key.clone(),
839 content_type: Some(meta.content_type),
840 content_disposition: meta.content_disposition,
841 content_language: meta.content_language,
842 .. Default::default()
843 })
844 .with_timeout(self.timeout).sync()?
845 .upload_id.expect("no upload ID");
846
847 debug!("Started multipart upload {:?}", upload_id);
848
849 let mut completed_parts = Vec::new();
850 let mut progress = TransferStatus::default();
851
852 self.notify_upload_progress(&progress);
854
855 let result = || -> Result<_, S3SyncError> {
856 let body = &mut body;
857
858 for part_number in 1u16.. {
859 let mut buf = Vec::with_capacity(self.part_size + 1);
865 let bytes = body.take(self.part_size as u64).read_to_end(&mut buf)?;
866
867 if bytes == 0 {
869 break
870 }
871
872 debug!("Uploading part {} ({} bytes)", part_number, bytes);
873 let result = self.client.upload_part(UploadPartRequest {
874 body: Some(ByteStream::from(buf)),
875 bucket: bucket_name.clone(),
876 key: object_key.clone(),
877 part_number: part_number as i64,
878 upload_id: upload_id.clone(),
879 .. Default::default()
880 }).with_timeout(self.data_timeout).sync()?;
881
882 completed_parts.push(CompletedPart {
883 e_tag: result.e_tag,
884 part_number: Some(part_number as i64),
885 });
886
887 progress.update(TransferStats {
888 buffers: 1,
889 bytes: bytes as u64,
890 bytes_total: bytes as u64,
891 });
892
893 self.notify_upload_progress(&progress);
895 }
896
897 if completed_parts.is_empty() {
899 return Err(S3SyncError::NoBodyError)
900 }
901
902 debug!("Multipart upload {:?} complete", upload_id);
903 self.client.complete_multipart_upload(CompleteMultipartUploadRequest {
904 bucket: bucket_name.clone(),
905 key: object_key.clone(),
906 upload_id: upload_id.clone(),
907 multipart_upload: Some(CompletedMultipartUpload {
908 parts: Some(completed_parts)
909 }),
910 .. Default::default()
911 }).with_timeout(self.timeout).sync()?;
912
913 Ok(Present(bucket_key))
914 }();
915
916 if let Err(err) = &result {
917 let err = Problem::from_error_message(err).to_string();
918 error!("Aborting multipart upload {:?} due to error: {}", upload_id, err);
919 self.client.abort_multipart_upload(AbortMultipartUploadRequest {
920 bucket: bucket_name,
921 key: object_key,
922 upload_id,
923 .. Default::default()
924 }).with_timeout(self.timeout).sync().ok_or_log_warn();
925
926 self.notify_upload_progress(&progress.failed(err));
928 } else {
929 self.notify_upload_progress(&progress.done());
931 }
932
933 result
934 }
935
936 pub fn delete_object<'b, 's: 'b>(
942 &'s self,
943 bucket_key: impl Into<BucketKey<'b>>
944 ) -> Result<Absent<BucketKey<'b>>, S3SyncError> {
945 let bucket_key = bucket_key.into();
946 debug!("Deleting object {:?} from S3 bucket {:?}",bucket_key.key, bucket_key.bucket.name);
947 let res = self.client.delete_object(DeleteObjectRequest {
948 bucket: bucket_key.bucket.name.clone(),
949 key: bucket_key.key.clone(),
950 .. Default::default()
951 }).with_timeout(self.timeout).sync()?;
952 trace!("Delete response: {:?}", res);
953
954 Ok(Absent(bucket_key))
955 }
956
957 pub fn delete_objects<'b, 's: 'b>(
972 &'s self,
973 bucket_keys: impl IntoIterator<Item = impl Into<BucketKey<'b>>>
974 ) -> impl Iterator<Item = Result<Vec<Result<Absent<BucketKey<'b>>, (BucketKey<'b>, S3SyncError)>>, S3SyncError>> + Captures1<'s> + Captures2<'b> {
975 let max_delete_objects = self.max_delete_objects;
976 bucket_keys
977 .into_iter()
978 .map(|o| o.into())
979 .peekable()
980 .batching(move |bucket_keys| {
981 let current_bucket_name = if let Some(bucket_key) = bucket_keys.peek() {
982 bucket_key.bucket.name.clone()
983 } else {
984 return None
985 };
986
987 Some((current_bucket_name.clone(),
988 bucket_keys
989 .peeking_take_while(move |object| object.bucket.name == current_bucket_name)
990 .take(max_delete_objects)
991 .collect::<Vec<_>>()))
992 })
993 .map(move |(current_bucket_name, bucket_keys): (_, Vec<_>)| {
994 debug!("Deleting batch of {} objects from S3 bucket {:?}", bucket_keys.len(), current_bucket_name);
995 let res = self.client.delete_objects(DeleteObjectsRequest {
996 bucket: current_bucket_name.clone(),
997 delete: Delete {
998 objects: bucket_keys.iter().map(|bucket_key| ObjectIdentifier {
999 key: bucket_key.key.clone(),
1000 .. Default::default()
1001 }).collect::<Vec<_>>(),
1002 .. Default::default()
1003 }, .. Default::default()
1004 }).with_timeout(self.timeout).sync()?;
1005 trace!("Delete response: {:?}", res);
1006
1007 let ok_objects =
1008 if let Some(deleted) = res.deleted {
1009 debug!("Deleted {} objects", deleted.len());
1010 deleted.into_iter().map(|deleted| {
1011 if let Some(key) = deleted.key {
1012 Ok(key)
1013 } else {
1014 Err(S3SyncError::RusotoError(RusotoError::Validation("got S3 delete object errors but no key or message information".to_owned())))
1015 }
1016 }).collect::<Result<HashSet<_>, _>>()?
1017 } else {
1018 Default::default()
1019 };
1020
1021 let mut failed_objects =
1022 if let Some(errors) = res.errors {
1023 errors.into_iter().map(|error| {
1024 error!("Error deleting S3 object {:?}: {}",
1025 error.key.as_ref().map(|s| s.as_str()).unwrap_or("<None>"),
1026 error.message.as_ref().map(|s| s.as_str()).unwrap_or("<None>"));
1027
1028 if let (Some(key), Some(error)) = (error.key, error.message) {
1031 Ok((key, S3SyncError::RusotoError(RusotoError::Validation(error))))
1032 } else {
1033 Err(S3SyncError::RusotoError(RusotoError::Validation("got S3 delete object errors but no key or message information".to_owned())))
1034 }
1035 }).collect::<Result<HashMap<_, _>, _>>()?
1036 } else {
1037 Default::default()
1038 };
1039
1040 Ok(bucket_keys.into_iter().map(|k| {
1041 if ok_objects.contains(&k.key) {
1042 Ok(Absent(k))
1043 } else if let Some(err) = failed_objects.remove(&k.key) {
1044 Err((k, err))
1045 } else {
1046 Err((k, S3SyncError::RusotoError(RusotoError::Validation("S3 did not report this object as deleted or failed to be deleted".to_owned()))))
1047 }
1048 }).collect::<Vec<_>>()) })
1050 }
1051
1052 pub fn object_present<'b, 's: 'b, R, F>(
1059 &'s self,
1060 bucket_key: BucketKey<'b>,
1061 check_impl: CheckObjectImpl,
1062 body: F
1063 ) -> impl Ensure<Present<BucketKey<'b>>, EnsureAction = impl Meet<Met = Present<BucketKey<'b>>, Error = S3SyncError> + Captures1<'s> + Captures2<'b>> + Captures1<'s> + Captures2<'b>
1064 where R: Read + 's, F: FnOnce() -> Result<(R, ObjectBodyMeta), std::io::Error> + 's {
1065 move || {
1066 Ok(match self.check_object_exists(bucket_key, check_impl)? {
1067 Left(present) => Met(present.into()),
1068 Right(absent) => EnsureAction(move || {
1069 let (body, meta) = body()?;
1070 self.put_object(absent, body, meta)
1071 })
1072 })
1073 }
1074 }
1075
1076 pub fn object_absent<'b, 's: 'b>(
1080 &'s self,
1081 bucket_key: BucketKey<'b>,
1082 check_impl: CheckObjectImpl
1083 ) -> impl Ensure<Absent<BucketKey<'b>>, EnsureAction = impl Meet<Met = Absent<BucketKey<'b>>, Error = S3SyncError> + Captures1<'s> + Captures2<'b>> + Captures1<'s> + Captures2<'b> {
1084 move || {
1085 Ok(match self.check_object_exists(bucket_key, check_impl)? {
1086 Right(absent) => Met(absent),
1087 Left(present) => EnsureAction(move || {
1088 self.delete_object(present)
1089 })
1090 })
1091 }
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097 use super::*;
1098 use assert_matches::assert_matches;
1099 use ensure::ExternalState;
1100 use std::io::Cursor;
1101
1102 fn s3_test_bucket() -> Bucket {
1103 Bucket::from_name(std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set"))
1104 }
1105
1106 fn test_key() -> String {
1107 use std::time::SystemTime;
1108 format!("s3-sync-test/foo-{}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_micros())
1109 }
1110
1111 #[test]
1112 fn test_get_body_bucket_key() {
1113 use std::io::Cursor;
1114
1115 let s3 = S3::default();
1116 let body = Cursor::new(b"hello world".to_vec());
1117
1118 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1119 let object = BucketKey::from_key(&bucket, test_key());
1120
1121 let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1122
1123 let mut body = Vec::new();
1124
1125 s3.get_body(&object).unwrap().read_to_end(&mut body).unwrap();
1126
1127 assert_eq!(&body, b"hello world");
1128 }
1129
1130 #[test]
1131 fn test_get_body_object() {
1132 use std::io::Cursor;
1133
1134 let s3 = S3::default();
1135 let body = Cursor::new(b"hello world".to_vec());
1136
1137 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1138 let object = BucketKey::from_key(&bucket, test_key());
1139
1140 let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1141
1142 let mut body = Vec::new();
1143
1144 let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1145 s3.get_body(&object).unwrap().read_to_end(&mut body).unwrap();
1146
1147 assert_eq!(&body, b"hello world");
1148 }
1149
1150 #[test]
1151 fn test_object_present() {
1152 use std::io::Cursor;
1153
1154 let s3 = S3::default();
1155 let body = Cursor::new(b"hello world".to_vec());
1156
1157 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1158 let object = BucketKey::from_key(&bucket, test_key());
1159
1160 let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1161
1162 assert!(s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().is_left());
1163 }
1164
1165 #[test]
1166 fn test_object_absent() {
1167 use std::io::Cursor;
1168
1169 let s3 = S3::default();
1170 let body = Cursor::new(b"hello world".to_vec());
1171
1172 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1173 let object = BucketKey::from_key(&bucket, test_key());
1174
1175 let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap().invalidate_state();
1176
1177 let object = s3.object_absent(object, CheckObjectImpl::Head).ensure().or_failed_to("make object absent");
1178
1179 assert!(s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::List).unwrap().is_right());
1180 }
1181
1182 #[test]
1183 fn test_object_present_empty_read() {
1184 use std::io::Cursor;
1185
1186 let s3 = S3::default();
1187 let body = Cursor::new(b"".to_vec());
1188
1189 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1190 let object = BucketKey::from_key(&bucket, test_key());
1191
1192 assert_matches!(s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure(), Err(S3SyncError::NoBodyError));
1193 }
1194
1195 #[test]
1196 fn test_object_present_progress() {
1197 let mut s3 = S3::default();
1198 let body = Cursor::new(b"hello world".to_vec());
1199
1200 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1201 let object = BucketKey::from_key(&bucket, test_key());
1202
1203 let asserts: Vec<Box<dyn Fn(&TransferStatus)>> = vec![
1204 Box::new(|t| assert_matches!(t, TransferStatus::Init)),
1205 Box::new(|t| assert_matches!(t, TransferStatus::Progress(_))),
1206 Box::new(|t| assert_matches!(t, TransferStatus::Done(_))),
1207 ];
1208
1209 let mut asserts = asserts.into_iter();
1210
1211 s3.with_on_upload_progress(move |t| asserts.next().unwrap()(t), |s3| {
1212 s3.object_present(object, CheckObjectImpl::Head, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1213 });
1214 }
1215
1216 #[test]
1217 fn test_delete_objects_given() {
1218 let s3 = S3::default();
1219
1220 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1221
1222 let objects = vec![
1223 BucketKey::from_key(&bucket, "s3-sync-test/bar-1".to_owned()),
1224 BucketKey::from_key(&bucket, "s3-sync-test/bar-2".to_owned()),
1225 ];
1226
1227 for object in objects.clone() {
1228 s3.put_object(object, Cursor::new(b"foo bar".to_vec()), ObjectBodyMeta::default()).unwrap();
1229 }
1230
1231 let ops = s3.delete_objects(objects).or_failed_to("delete objects").collect::<Vec<_>>();
1232
1233 assert_eq!(ops.len(), 1);
1235 assert_eq!(ops[0].len(), 2);
1237
1238 assert_matches!(&ops[0][0], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1239 assert_eq!(name, &s3_test_bucket().name);
1240 assert_eq!(key, "s3-sync-test/bar-1")
1241 });
1242
1243 assert_matches!(&ops[0][1], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1244 assert_eq!(name, &s3_test_bucket().name);
1245 assert_eq!(key, "s3-sync-test/bar-2")
1246 });
1247 }
1248
1249 #[test]
1250 fn test_delete_objects_from_list() {
1251 let s3 = S3::default();
1252
1253 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1254
1255 let objects = vec![
1256 BucketKey::from_key(&bucket, "s3-sync-test/baz-1".to_owned()),
1257 BucketKey::from_key(&bucket, "s3-sync-test/baz-2".to_owned()),
1258 BucketKey::from_key(&bucket, "s3-sync-test/bax".to_owned()),
1259 ];
1260
1261 for object in objects {
1262 s3.put_object(object, Cursor::new(b"foo bar".to_vec()), ObjectBodyMeta::default()).unwrap();
1263 }
1264
1265 let objects = s3.list_objects(&bucket, "s3-sync-test/baz".to_owned()).or_failed_to("get list of objects");
1266 let ops = s3.delete_objects(objects).or_failed_to("delete objects").collect::<Vec<_>>();
1267
1268 assert_eq!(ops.len(), 1);
1270 assert_eq!(ops[0].len(), 2);
1272
1273 assert_matches!(&ops[0][0], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1274 assert_eq!(name, &s3_test_bucket().name);
1275 assert_eq!(key, "s3-sync-test/baz-1")
1276 });
1277
1278 assert_matches!(&ops[0][1], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1279 assert_eq!(name, &s3_test_bucket().name);
1280 assert_eq!(key, "s3-sync-test/baz-2")
1281 });
1282 }
1283
1284 #[test]
1285 fn test_object_last_modified() {
1286 use std::io::Cursor;
1287
1288 let s3 = S3::default();
1289 let body = Cursor::new(b"hello world".to_vec());
1290
1291 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1292 let object = BucketKey::from_key(&bucket, test_key());
1293
1294 let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1295
1296 let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1297 matches!(object.last_modified(), LastModified::Rfc2822(_));
1298
1299 let object = s3.check_object_exists(object.unwrap_bucket_key().invalidate_state(), CheckObjectImpl::List).unwrap().unwrap_left();
1300 matches!(object.last_modified(), LastModified::Rfc3339(_));
1301 }
1302
1303 #[cfg(feature = "chrono")]
1304 #[test]
1305 fn test_object_last_modified_chrono() {
1306 use std::io::Cursor;
1307
1308 let s3 = S3::default();
1309 let body = Cursor::new(b"hello world".to_vec());
1310
1311 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1312 let object = BucketKey::from_key(&bucket, test_key());
1313
1314 let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1315
1316 let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1317 assert!(object.last_modified().parse().is_ok());
1318
1319 let object = s3.check_object_exists(object.unwrap_bucket_key().invalidate_state(), CheckObjectImpl::List).unwrap().unwrap_left();
1320 assert!(object.last_modified().parse().is_ok());
1321 }
1322
1323 #[test]
1324 fn test_bucket_lifetimes() {
1325 fn compiles1<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1348 let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1349 let bucket_key = BucketKey::from_key(&bucket, key);
1350
1351 let present_bucket_key = bucket_key.assume_present();
1352 let input = s3.get_body_box(&present_bucket_key).unwrap();
1353
1354 input }
1356
1357 fn compiles2<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1358 let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1359 let bucket_key = BucketKey::from_key(&bucket, key);
1360
1361 let object = s3.check_object_exists(bucket_key, CheckObjectImpl::Head).unwrap().left().unwrap();
1362 let input = s3.get_body_box(&object).unwrap();
1363
1364 input }
1366
1367 let s3 = S3::default();
1368 let body = Cursor::new(b"hello world".to_vec());
1369
1370 let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1371 let key = test_key();
1372 let object = BucketKey::from_key(&bucket, key.clone());
1373
1374 s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1375
1376 let mut body = Vec::new();
1377 compiles1(&s3, s3_test_bucket().name().to_owned(), key.clone()).read_to_end(&mut body).unwrap();
1378 assert_eq!(&body, b"hello world");
1379
1380 let mut body = Vec::new();
1381 compiles2(&s3, s3_test_bucket().name().to_owned(), key.clone()).read_to_end(&mut body).unwrap();
1382 assert_eq!(&body, b"hello world");
1383 }
1384}