s3_sync/
lib.rs

1/*!
2High level synchronous S3 client.
3
4This client wraps Rusoto S3 and provides the following features:
5* check if bucket or object exists,
6* list objects that match prefix as iterator that handles pagination transparently,
7* put large objects via multipart API and follow progress via callback,
8* delete single or any number of objects via bulk delete API,
9* deffer execution using `ensure` crate for putting and deleting objects.
10
11Example usage
12=============
13
14```rust
15use s3_sync::{S3, Region, ObjectBodyMeta, BucketKey, Bucket};
16use std::io::Cursor;
17use std::io::Read;
18
19let test_bucket = std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set");
20let test_key = "foobar.test";
21
22let s3 = S3::default();
23
24let bucket = s3.check_bucket_exists(Bucket::from_name(test_bucket)).expect("check if bucket exists")
25    .left().expect("existing bucket");
26let bucket_key = BucketKey::from_key(&bucket, test_key);
27
28let body = Cursor::new(b"hello world".to_vec());
29let object = s3.put_object(bucket_key, body, ObjectBodyMeta::default()).unwrap();
30
31let mut body = Vec::new();
32s3.get_body(&object).expect("object body").read_to_end(&mut body).unwrap();
33
34assert_eq!(&body, b"hello world");
35```
36
37Cargo features
38==============
39
40* `chrono` - enables `parse` method on `LastModified`
41
42*/
43use rusoto_s3::{S3Client, HeadObjectOutput};
44use rusoto_s3::{DeleteObjectRequest, DeleteObjectsRequest, Delete, ObjectIdentifier, HeadObjectRequest, HeadObjectError, HeadBucketRequest, HeadBucketError};
45pub use rusoto_core::region::Region;
46use rusoto_core::RusotoError;
47use rusoto_core::request::BufferedHttpResponse;
48use rusoto_s3::S3 as S3Trait;
49use rusoto_s3::{ListObjectsV2Request, ListObjectsV2Output};
50use rusoto_s3::Object as S3Object;
51use log::{trace, debug, error};
52use itertools::unfold;
53use std::borrow::Borrow;
54use std::time::Duration;
55use std::io::Read;
56use std::error::Error;
57use std::fmt;
58use std::collections::{HashMap, HashSet};
59use std::cell::RefCell;
60use ensure::{Absent, Present, Ensure, Meet, External};
61use ensure::CheckEnsureResult::*;
62use either::Either;
63use Either::*;
64use problem::prelude::*;
65use itertools::Itertools;
66#[cfg(feature = "chrono")]
67use chrono::{DateTime, FixedOffset};
68
69// re-export return type crates
70pub use ensure;
71pub use either;
72
73pub trait Captures1<'i> {}
74impl<'i, T> Captures1<'i> for T {}
75
76pub trait Captures2<'i> {}
77impl<'i, T> Captures2<'i> for T {}
78
79/// Possible errors in `s3-sync` library.
80#[derive(Debug)]
81pub enum S3SyncError {
82    RusotoError(RusotoError<Box<dyn Error + 'static>>),
83    IoError(std::io::Error),
84    NoBodyError,
85    MissingObjectMetaData(&'static str),
86    #[cfg(feature = "chrono")]
87    ChronoError(chrono::ParseError),
88}
89
90impl fmt::Display for S3SyncError {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self {
93            S3SyncError::RusotoError(err) => match err {
94                RusotoError::Service(err) => write!(f, "AWS service-specific error occurred: {}", err),
95                RusotoError::HttpDispatch(err) => write!(f, "AWS error occurred dispatching the HTTP request: {}", err),
96                RusotoError::Credentials(err) => write!(f, "AWS error was encountered with credentials: {}", err),
97                RusotoError::Validation(err) => write!(f, "AWS validation error occurred: {}", err),
98                RusotoError::ParseError(err) => write!(f, "AWS error occurred parsing the response payload: {}", err),
99                RusotoError::Unknown(err @ BufferedHttpResponse { status, .. }) =>
100                    if let Some(reason) = status.canonical_reason() {
101                        if err.body.is_empty() {
102                            write!(f, "AWS HTTP error occurred: {}", reason)
103                        } else {
104                            write!(f, "AWS HTTP error occurred: {}: {}", reason, err.body_as_str())
105                        }
106                    } else {
107                        write!(f, "unknown AWS HTTP error occurred: {:?}", err)
108                    }
109            },
110            S3SyncError::IoError(_) => write!(f, "local I/O error"),
111            S3SyncError::NoBodyError => write!(f, "expected body but found none"),
112            S3SyncError::MissingObjectMetaData(meta) => write!(f, "expected object to have {} value but found none", meta),
113            #[cfg(feature = "chrono")]
114            S3SyncError::ChronoError(_) => write!(f, "error parsing timestamp"),
115        }
116    }
117}
118
119impl Error for S3SyncError {
120    fn source(&self) -> Option<&(dyn Error + 'static)> {
121        match self {
122            S3SyncError::RusotoError(_) => None,
123            S3SyncError::IoError(err) => Some(err),
124            S3SyncError::NoBodyError => None,
125            S3SyncError::MissingObjectMetaData(_) => None,
126            #[cfg(feature = "chrono")]
127            S3SyncError::ChronoError(err) => Some(err),
128        }
129    }
130}
131
132impl<T: Error + 'static> From<RusotoError<T>> for S3SyncError {
133    fn from(err: RusotoError<T>) -> S3SyncError {
134        match err {
135            RusotoError::Service(err) => S3SyncError::RusotoError(RusotoError::Service(Box::new(err))),
136            RusotoError::HttpDispatch(err) => S3SyncError::RusotoError(RusotoError::HttpDispatch(err)),
137            RusotoError::Credentials(err) => S3SyncError::RusotoError(RusotoError::Credentials(err)),
138            RusotoError::Validation(err) => S3SyncError::RusotoError(RusotoError::Validation(err)),
139            RusotoError::ParseError(err) => S3SyncError::RusotoError(RusotoError::ParseError(err)),
140            RusotoError::Unknown(err) => S3SyncError::RusotoError(RusotoError::Unknown(err)),
141        }
142    }
143}
144
145impl From<std::io::Error> for S3SyncError {
146    fn from(err: std::io::Error) -> S3SyncError {
147        S3SyncError::IoError(err)
148    }
149}
150
151#[cfg(feature = "chrono")]
152impl From<chrono::ParseError> for S3SyncError {
153    fn from(err: chrono::ParseError) -> S3SyncError {
154        S3SyncError::ChronoError(err)
155    }
156}
157
158/// Represents S3 bucket.
159#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
160pub struct Bucket {
161    name: String
162}
163
164impl External for Bucket {}
165
166impl Bucket {
167    /// Creates [Bucket] from bucket name.
168    pub fn from_name(name: impl Into<String>) -> Bucket {
169        Bucket {
170            name: name.into()
171        }
172    }
173
174    /// Gets bucket name.
175    pub fn name(&self) -> &str {
176        &self.name
177    }
178}
179
180/// Represents a pointer to an object in S3 bucket.
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct BucketKey<'b> {
183    pub bucket: &'b Present<Bucket>,
184    pub key: String,
185}
186
187impl External for BucketKey<'_> {}
188
189impl<'b> BucketKey<'b> {
190    /// Creates [BucketKey] from present [Bucket] and given key.
191    pub fn from_key(bucket: &Present<Bucket>, key: impl Into<String>) -> BucketKey {
192        BucketKey {
193            bucket,
194            key: key.into(),
195        }
196    }
197
198    /// Gets bucket name.
199    pub fn bucket_name(&self) -> &str {
200        self.bucket.name()
201    }
202
203    /// Gets key.
204    pub fn key(&self) -> &str {
205        &self.key
206    }
207
208    /// Gets present bucket.
209    pub fn bucket(&self) -> &Present<Bucket> {
210        self.bucket
211    }
212
213    /// Makes an assumption that the object does exist on S3 without making an API call to verify.
214    pub fn assume_present(self) -> Present<BucketKey<'b>> {
215        Present(self)
216    }
217}
218
219impl<'b> From<Object<'b>> for BucketKey<'b> {
220    fn from(obj: Object<'b>) -> BucketKey<'b> {
221        obj.bucket_key.0
222    }
223}
224
225impl<'b> From<Object<'b>> for Present<BucketKey<'b>> {
226    fn from(obj: Object<'b>) -> Present<BucketKey<'b>> {
227        obj.bucket_key
228    }
229}
230
231impl<'b> Borrow<Present<BucketKey<'b>>> for Object<'b> {
232    fn borrow(&self) -> &Present<BucketKey<'b>> {
233        self.bucket_key()
234    }
235}
236
237impl<'b> From<Present<BucketKey<'b>>> for BucketKey<'b> {
238    fn from(obj: Present<BucketKey<'b>>) -> BucketKey<'b> {
239        obj.0
240    }
241}
242
243impl<'b> From<Absent<BucketKey<'b>>> for BucketKey<'b> {
244    fn from(obj: Absent<BucketKey<'b>>) -> BucketKey<'b> {
245        obj.0
246    }
247}
248
249/// Represents an existing object and its metadata in S3.
250#[derive(Debug, PartialEq, Eq)]
251pub struct Object<'b> {
252    pub bucket_key: Present<BucketKey<'b>>,
253    pub size: i64,
254    pub e_tag: String,
255    pub last_modified: LastModified,
256}
257
258impl<'b> Object<'b> {
259    fn from_head(
260        bucket_key: BucketKey<'b>,
261        res: HeadObjectOutput,
262    ) -> Result<Object, S3SyncError> {
263        Ok(Object {
264            bucket_key: Present(bucket_key),
265            e_tag: res.e_tag.ok_or(S3SyncError::MissingObjectMetaData("e_tag"))?,
266            last_modified: res.last_modified.map(LastModified::Rfc2822)
267                .ok_or(S3SyncError::MissingObjectMetaData("last_modified"))?,
268            size: res.content_length.ok_or(S3SyncError::MissingObjectMetaData("content_length"))?,
269        })
270    }
271
272    fn from_s3_object(bucket: &Present<Bucket>, object: S3Object) -> Result<Object, S3SyncError> {
273        Ok(Object {
274            bucket_key: Present(BucketKey::from_key(bucket, object.key
275                .ok_or(S3SyncError::MissingObjectMetaData("key"))?)),
276            e_tag: object.e_tag.ok_or(S3SyncError::MissingObjectMetaData("e_tag"))?,
277            last_modified: object.last_modified.map(LastModified::Rfc3339)
278                .ok_or(S3SyncError::MissingObjectMetaData("last_modified"))?,
279            size: object.size.ok_or(S3SyncError::MissingObjectMetaData("size"))?,
280        })
281    }
282
283    /// Gets [BucketKey] pointing to this object.
284    pub fn bucket_key(&self) -> &Present<BucketKey<'b>> {
285        &self.bucket_key
286    }
287
288    /// Unwraps inner [BucketKey] pointing to this object.
289    pub fn unwrap_bucket_key(self) -> Present<BucketKey<'b>> {
290        self.bucket_key
291    }
292
293    /// Gets object size.
294    pub fn size(&self) -> i64 {
295        self.size
296    }
297
298    /// Gets object ETag.
299    pub fn e_tag(&self) -> &str {
300        &self.e_tag
301    }
302
303    /// Gets object last modified time.
304    pub fn last_modified(&self) -> &LastModified {
305        &self.last_modified
306    }
307}
308
309/// Existing pair of bucket name and key strings.
310pub trait PresentBucketKeyName {
311    /// Gets bucket name.
312    fn bucket_name(&self) -> &str;
313    /// Gets key.
314    fn key(&self) -> &str;
315}
316
317impl<'b> PresentBucketKeyName for Present<BucketKey<'b>> {
318    fn bucket_name(&self) -> &str {
319        self.bucket.name()
320    }
321
322    fn key(&self) -> &str {
323        &self.key
324    }
325}
326
327impl<'b> PresentBucketKeyName for Object<'b> {
328    fn bucket_name(&self) -> &str {
329        self.bucket_key.bucket_name()
330    }
331
332    fn key(&self) -> &str {
333        self.bucket_key.key()
334    }
335}
336
337/// Represents last modified time and date.
338///
339/// Depending on the source of the information it will be in different format.
340#[derive(Debug, PartialEq, Eq)]
341pub enum LastModified {
342    /// Date and time in RFC2822 format.
343    Rfc2822(String),
344    /// Date and time in RFC3339 format.
345    Rfc3339(String),
346}
347
348impl fmt::Display for LastModified {
349    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350        write!(f, "{}", self.as_str())
351    }
352}
353
354impl LastModified {
355    /// Gets string representation that may be in different format.
356    pub fn as_str(&self) -> &str {
357        match self {
358            LastModified::Rfc2822(dt) => &dt,
359            LastModified::Rfc3339(dt) => &dt,
360        }
361    }
362
363    #[cfg(feature = "chrono")]
364    /// Returns parsed date and time.
365    pub fn parse(&self) -> Result<DateTime<FixedOffset>, S3SyncError> {
366        Ok(match &self {
367            LastModified::Rfc2822(lm) => DateTime::parse_from_rfc2822(lm),
368            LastModified::Rfc3339(lm) => DateTime::parse_from_rfc3339(lm),
369        }?)
370    }
371}
372
373struct PaginationIter<RQ, RS, SSA, GSA, FF, E>
374where RQ: Clone, SSA: Fn(&mut RQ, String), GSA: Fn(&RS) -> Option<String>, FF: Fn(RQ) -> Result<RS, E> {
375    request: RQ,
376    // function that returns request parametrised to fetch next page
377    set_start_after: SSA,
378    get_start_after: GSA,
379    fetch: FF,
380    done: bool
381}
382
383impl<RQ, RS, SSA, GSA, FF, E> Iterator for PaginationIter<RQ, RS, SSA, GSA, FF, E>
384where RQ: Clone, SSA: Fn(&mut RQ, String), GSA: Fn(&RS) -> Option<String>, FF: Fn(RQ) -> Result<RS, E> {
385    type Item = Result<RS, E>;
386
387    fn next(&mut self) -> Option<Self::Item> {
388        if self.done {
389            return None
390        }
391        Some((self.fetch)(self.request.clone()).map(|response| {
392            if let Some(start_after) = (self.get_start_after)(&response) {
393                (self.set_start_after)(&mut self.request, start_after);
394            } else {
395                self.done = true;
396            }
397            response
398        }))
399    }
400}
401
402/// Information about status of the ongoing transfer.
403#[derive(Debug)]
404pub enum TransferStatus {
405    /// Initialization successful.
406    Init,
407    /// Transfer is ongoing.
408    Progress(TransferStats),
409    /// Transfer successfully complete.
410    Done(TransferStats),
411    /// There was an error.
412    Failed(String),
413}
414
415impl Default for TransferStatus {
416    fn default() -> Self {
417        TransferStatus::Init
418    }
419}
420
421impl TransferStatus {
422    fn update(&mut self, stats: TransferStats) {
423        match self {
424            TransferStatus::Init => {
425                *self = TransferStatus::Progress(stats);
426            }
427            TransferStatus::Progress(ref mut s) => {
428                s.buffers += stats.buffers;
429                s.bytes = stats.bytes;
430                s.bytes_total += stats.bytes_total;
431            },
432            _ => panic!("TransferStats in bad state for .done(): {:?}", self),
433        }
434    }
435
436    fn done(self) -> Self {
437        match self {
438            TransferStatus::Progress(stats) => TransferStatus::Done(stats),
439            _ => panic!("TransferStats in bad state for .done(): {:?}", self),
440        }
441    }
442
443    fn failed(self, err: String) -> Self {
444        match self {
445            TransferStatus::Init |
446            TransferStatus::Progress(_) => TransferStatus::Failed(err),
447            _ => panic!("TransferStats in bad state for .failed(): {:?}", self),
448        }
449    }
450}
451
452/// Information about transfer progress.
453#[derive(Debug, Default)]
454pub struct TransferStats {
455    /// Number of buffers or parts transferred.
456    pub buffers: u16,
457    /// Number of bytes transferred since last progress.
458    pub bytes: u64,
459    /// Total number of bytes transferred.
460    pub bytes_total: u64,
461}
462
463/// Meta information about object body.
464#[derive(Debug)]
465pub struct ObjectBodyMeta {
466    /// A standard MIME type describing the format of the object data.
467    pub content_type: String,
468    /// Specifies presentational information for the object.
469    pub content_disposition: Option<String>,
470    /// The language the content is in.
471    pub content_language: Option<String>,
472}
473
474impl Default for ObjectBodyMeta {
475    fn default() -> ObjectBodyMeta {
476        ObjectBodyMeta {
477            content_type: "application/octet-stream".to_owned(),
478            content_disposition: None,
479            content_language: None,
480        }
481    }
482}
483
484/// Method used for checking if given object exists
485#[derive(Debug)]
486pub enum CheckObjectImpl {
487    /// Required `GetObject` AWS permission
488    Head,
489    /// Required `ListBucket` AWS permission
490    List,
491}
492
493#[derive(Debug)]
494pub struct Settings {
495    /// Size of multipart upload part.
496    ///
497    /// Note: On AWS S3 the part size is must be between 5MiB to 5GiB
498    pub part_size: usize,
499    /// Timeout for non data related operations.
500    pub timeout: Duration,
501    /// Timeout for data upload/download operations.
502    pub data_timeout: Duration,
503    /// Maximum number of multipart uploads (for calculations of [S3::max_upload_size()]) (AWS limit is 10k)
504    pub max_multipart_upload_parts: usize,
505    /// Maximum number of objects that can be deleted with one API call (AWS limit is 1k)
506    pub max_delete_objects: usize,
507}
508
509impl Default for Settings {
510    fn default() -> Settings {
511        Settings {
512            part_size: 10 * 1024 * 1024, // note that max part count is 10k so we can upload up to 100_000 MiB
513            timeout: Duration::from_secs(10),
514            data_timeout: Duration::from_secs(300),
515            max_multipart_upload_parts: 10_000, // ASW S3 limit
516            max_delete_objects: 1_000, // ASW S3 limit
517        }
518    }
519}
520
521/// Wrapper of Rusoto S3 client that adds some high level imperative and declarative operations on
522/// S3 buckets and objects.
523pub struct S3 {
524    client: S3Client,
525    on_upload_progress: Option<RefCell<Box<dyn FnMut(&TransferStatus)>>>,
526    part_size: usize,
527    timeout: Duration,
528    data_timeout: Duration,
529    max_multipart_upload_parts: usize,
530    max_delete_objects: usize,
531}
532
533impl fmt::Debug for S3 {
534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535        f.debug_struct("S3")
536         .field("part_size", &self.part_size)
537         .field("timeout", &self.timeout)
538         .field("data_timeout", &self.data_timeout)
539         .finish()
540    }
541}
542
543impl Default for S3 {
544    fn default() -> S3 {
545        S3::new(None, None, None)
546    }
547}
548
549impl S3 {
550    /// Creates high level S3 client.
551    ///
552    /// * `region` - the AWS region to connect to; when `None` autodetects the region value (see [Region] for detail)
553    /// * `region_endpoint` - use dedicated AWS endpoint within the region
554    /// * `settings` - use specific client setting
555    pub fn new(
556        region: impl Into<Option<Region>>,
557        region_endpoint: impl Into<Option<String>>,
558        settings: impl Into<Option<Settings>>
559    ) -> S3 {
560        let region = match (region.into(), region_endpoint.into()) {
561            (Some(region), Some(endpoint)) => Region::Custom { name: region.name().to_owned(), endpoint },
562            (None, Some(endpoint)) => Region::Custom { name: Region::default().name().to_owned(), endpoint },
563            (Some(region), None) => region,
564            _ => Region::default(),
565        };
566        let settings = settings.into().unwrap_or_default();
567
568        S3 {
569            client: S3Client::new(region),
570            on_upload_progress: None,
571            part_size: settings.part_size,
572            timeout: settings.timeout,
573            data_timeout: settings.data_timeout,
574            max_multipart_upload_parts: settings.max_multipart_upload_parts,
575            max_delete_objects: settings.max_delete_objects,
576        }
577    }
578
579    /// Creates high level S3 client with given region and default settings.
580    pub fn with_region(region: Region) -> S3 {
581        S3::new(region, None, None)
582    }
583
584    /// Gets maximum size of the multipart upload part.
585    ///
586    /// Useful to set up other I/O buffers accordingly.
587    pub fn part_size(&self) -> usize {
588        self.part_size
589    }
590
591    /// Returns maximum size of data in bytes that can be uploaded to a single object with current settings.
592    pub fn max_upload_size(&self) -> usize {
593        self.part_size * self.max_multipart_upload_parts
594    }
595
596    /// Set callback on body upload progress.
597    pub fn on_upload_progress(
598        &mut self,
599        callback: impl FnMut(&TransferStatus) + 'static
600    ) -> Option<Box<dyn FnMut(&TransferStatus)>> {
601        let ret = self.on_upload_progress.take();
602        self.on_upload_progress = Some(RefCell::new(Box::new(callback)));
603        ret.map(|c| c.into_inner())
604    }
605
606    /// Calls `f` with [S3] client that has [S3::on_upload_progress()] set to `callback` and restores
607    /// callback to previous state on return.
608    pub fn with_on_upload_progress<O>(
609        &mut self,
610        callback: impl FnMut(&TransferStatus) + 'static,
611        f: impl FnOnce(&mut Self) -> O
612    ) -> O {
613        let old = self.on_upload_progress(callback);
614        let ret = f(self);
615        old.map(|callback| self.on_upload_progress(callback));
616        ret
617    }
618
619    fn notify_upload_progress(&self, status: &TransferStatus) {
620        self.on_upload_progress.as_ref().map(|c| {
621            c.try_borrow_mut().expect("S3 upload_progress closure already borrowed mutable").as_mut()(status);
622        });
623    }
624
625    /// Checks if given bucket exists.
626    pub fn check_bucket_exists(&self, bucket: Bucket) -> Result<Either<Present<Bucket>, Absent<Bucket>>, S3SyncError> {
627        let res = self.client.head_bucket(HeadBucketRequest {
628            bucket: bucket.name.clone(),
629            .. Default::default()
630        }).with_timeout(self.timeout).sync();
631        trace!("Head bucket response: {:?}", res);
632
633        match res {
634            Ok(_) => Ok(Left(Present(bucket))),
635            Err(RusotoError::Service(HeadBucketError::NoSuchBucket(_))) => Ok(Right(Absent(bucket))),
636            Err(RusotoError::Unknown(BufferedHttpResponse { status, .. })) if status.as_u16() == 404 => Ok(Right(Absent(bucket))),
637            Err(err) => Err(err.into())
638        }
639    }
640
641    /// Checks if given object exists.
642    ///
643    /// * `implementation` - select implementation of this function
644    ///
645    /// Note:
646    ///
647    /// * [Object::last_modified()] value will be in different format depending on implementation.
648    pub fn check_object_exists<'s, 'b>(&'s self, bucket_key: BucketKey<'b>, implementation: CheckObjectImpl)
649        -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
650        match implementation {
651            CheckObjectImpl::List => self.check_object_exists_list(bucket_key),
652            CheckObjectImpl::Head => self.check_object_exists_head(bucket_key),
653        }
654    }
655
656    /// Checks if given object exists by issuing `HeadObject` API request.
657    ///
658    /// Note:
659    ///
660    /// * Requires `GetObject` AWS permission.
661    /// * The [Object::last_modified()] value will be in RFC 2822 format.
662    pub fn check_object_exists_head<'s, 'b>(&'s self, bucket_key: BucketKey<'b>)
663        -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
664        let res = self.client.head_object(HeadObjectRequest {
665            bucket: bucket_key.bucket.name.clone(),
666            key: bucket_key.key.clone(),
667            .. Default::default()
668        }).with_timeout(self.timeout).sync();
669        trace!("Head response: {:?}", res);
670
671        match res {
672            Ok(res) => Ok(Left(Object::from_head(bucket_key, res)?)),
673            Err(RusotoError::Service(HeadObjectError::NoSuchKey(_))) =>
674                Ok(Right(Absent(bucket_key))),
675            Err(RusotoError::Unknown(BufferedHttpResponse { status, .. })) if status.as_u16() == 404 =>
676                Ok(Right(Absent(bucket_key))),
677            Err(err) => Err(err.into())
678        }
679    }
680
681    /// Checks if given object exists by listing objects with `ListObjcetsV2` API request.
682    ///
683    /// Note:
684    ///
685    /// * Requires `ListBucket` AWS permission.
686    /// * The [Object::last_modified()] value will be in RFC 3339 format.
687    pub fn check_object_exists_list<'s, 'b>(&'s self, bucket_key: BucketKey<'b>)
688        -> Result<Either<Object<'b>, Absent<BucketKey<'b>>>, S3SyncError> {
689        let request = ListObjectsV2Request {
690            bucket: bucket_key.bucket.name().to_owned(),
691            prefix: Some(bucket_key.key.clone()),
692            max_keys: Some(1),
693            .. Default::default()
694        };
695
696        let res = self.client.list_objects_v2(request).with_timeout(self.timeout).sync()?;
697        let first_obj = res.contents.
698            and_then(|list| list.into_iter().next());
699        match first_obj {
700            Some(obj) if obj.key.as_deref().expect("S3 object has no key!") == bucket_key.key =>
701                Ok(Left(Object::from_s3_object(bucket_key.bucket, obj)?)),
702            _ => Ok(Right(Absent(bucket_key)))
703        }
704    }
705
706    /// Provides iterator of objects in existing bucket that have key of given prefix.
707    ///
708    /// Note:
709    ///
710    /// * Requires `ListBucket` AWS permission.
711    /// * The [Object::last_modified()] value will be in RFC 3339 format.
712    pub fn list_objects<'b, 's: 'b>(&'s self, bucket: &'b Present<Bucket>, prefix: String)
713        -> impl Iterator<Item = Result<Object<'b>, S3SyncError>> + Captures1<'s> + Captures2<'b> {
714        let client = &self.client;
715        let pages = PaginationIter {
716            request: ListObjectsV2Request {
717                bucket: bucket.name().to_owned(),
718                prefix: Some(prefix),
719                .. Default::default()
720            },
721            set_start_after: |request: &mut ListObjectsV2Request, start_after| {
722                request.start_after = Some(start_after);
723            },
724            get_start_after: |response: &ListObjectsV2Output| {
725                response.contents.as_ref()
726                    .and_then(|objects| objects.last().and_then(|last| last.key.as_ref().map(|r| r.clone())))
727            },
728            fetch: move |request: ListObjectsV2Request| {
729                client.list_objects_v2(request).with_timeout(self.timeout).sync().map_err(Into::into)
730            },
731            done: false
732        };
733
734        pages.flat_map(move |response| {
735            let mut error = None;
736            let mut objects = None;
737            match response {
738                Err(err) => error = Some(err),
739                Ok(output) => objects = output.contents.map(|objects| objects.into_iter()),
740            }
741
742            unfold((), move |_| {
743                if let Some(error) = error.take() {
744                    Some(Err(error))
745                } else {
746                    objects.as_mut().and_then(|obj| obj.next()).map(|o| Ok(Object::from_s3_object(bucket, o)?))
747                }
748            })
749        })
750    }
751
752    fn _get_body(&self, bucket: String, key: String) -> Result<impl Read + Send + '_, S3SyncError> {
753        use rusoto_s3::GetObjectRequest;
754        let req =  GetObjectRequest {
755            bucket,
756            key,
757            .. Default::default()
758        };
759        self.client.get_object(req)
760            .with_timeout(self.data_timeout)
761            .sync()
762            .map_err(Into::into)
763            .and_then(|output| output.body.ok_or(S3SyncError::NoBodyError))
764            .map(|body| body.into_blocking_read())
765    }
766
767    /// Gets object body.
768    pub fn get_body(&self, bucket_key: &impl PresentBucketKeyName) -> Result<impl Read + Send + '_, S3SyncError> {
769        let bucket = bucket_key.bucket_name().to_owned();
770        let key = bucket_key.key().to_owned();
771        self._get_body(bucket, key)
772    }
773
774    /// Gets object body boxed.
775    ///
776    /// Note: This is provided as a workaround for "impl Trait return type capcuring all input lifetimes" if needed.
777    pub fn get_body_box(&self, bucket_key: &impl PresentBucketKeyName) -> Result<Box<dyn Read + Send + '_>, S3SyncError> {
778        let bucket = bucket_key.bucket_name().to_owned();
779        let key = bucket_key.key().to_owned();
780        self._get_body(bucket, key).map(|r| Box::new(r) as Box<dyn Read + Send>)
781    }
782
783    /// Gets object body retrying the operation in case of an error.
784    ///
785    /// * `retires` - retry get_body call up to that many times
786    /// * `on_error` - called when get_body call fails and there are still retries left;
787    /// if gets number of retries left and the error and if it returns false the retry loop is
788    /// aborted
789    ///
790    /// Note:
791    ///
792    /// * The `on_error` closure may need to pause the execution of the thread to delay next retry attempt.
793    /// * Once this function returns, the subsequent read operation failures are not retried.
794    pub fn get_body_with_retry<'s, F>(
795        &'s self, bucket_key: &impl PresentBucketKeyName,
796        mut retries: u32,
797        on_error: F
798    ) -> Result<impl Read + Send + 's, S3SyncError>
799    where F: Fn(u32, &S3SyncError) -> bool {
800        let bucket = bucket_key.bucket_name().to_owned();
801        let key = bucket_key.key().to_owned();
802
803        loop {
804            match self._get_body(bucket.clone(), key.clone()) {
805                Ok(body) => return Ok(body),
806                Err(err) => {
807                    if retries == 0 || !on_error(retries, &err) {
808                        return Err(err)
809                    }
810                    retries -= 1;
811                }
812            }
813        }
814    }
815
816    /// Creates the S3 object with given body using multipart upload API.
817    ///
818    /// Warning: Existing object will be overwritten (subject to bucket versioning settings).
819    ///
820    /// The size of the body is limited to value returned by [S3::max_upload_size()].
821    /// Increase [Settings::part_size] to be able to upload more data (`max_upload_size = part_size *
822    /// 10_000` on AWS; with default settings the limit is 100_000 MiB).
823    pub fn put_object<'s, 'b>(
824        &'s self,
825        bucket_key: impl Into<BucketKey<'b>>,
826        mut body: impl Read,
827        meta: ObjectBodyMeta
828    ) -> Result<Present<BucketKey<'b>>, S3SyncError> {
829        use rusoto_s3::{CreateMultipartUploadRequest, UploadPartRequest, CompleteMultipartUploadRequest, AbortMultipartUploadRequest, CompletedMultipartUpload, CompletedPart};
830        use rusoto_core::ByteStream;
831
832        let bucket_key = bucket_key.into();
833        let bucket_name = bucket_key.bucket.name.clone();
834        let object_key = bucket_key.key.clone();
835
836        let upload_id = self.client.create_multipart_upload(CreateMultipartUploadRequest {
837            bucket: bucket_name.clone(),
838            key: object_key.clone(),
839            content_type: Some(meta.content_type),
840            content_disposition: meta.content_disposition,
841            content_language: meta.content_language,
842            .. Default::default()
843        })
844        .with_timeout(self.timeout).sync()?
845        .upload_id.expect("no upload ID");
846
847        debug!("Started multipart upload {:?}", upload_id);
848
849        let mut completed_parts = Vec::new();
850        let mut progress = TransferStatus::default();
851
852        // Notify progress init
853        self.notify_upload_progress(&progress);
854
855        let result = || -> Result<_, S3SyncError> {
856            let body = &mut body;
857
858            for part_number in 1u16.. {
859                // Note that S3 does not support chunked uploads of single part so we need to send
860                // full part at once. Best thing to do here would be to use Bytes directly to avoid
861                // allocation per part upload...
862                // Need to allocate one byte more to avoid re-allocation as read_to_end needs to
863                // have place for storage before it gets EoF.
864                let mut buf = Vec::with_capacity(self.part_size + 1);
865                let bytes = body.take(self.part_size as u64).read_to_end(&mut buf)?;
866
867                // Don't create 0 byte parts on EoF
868                if bytes == 0 {
869                    break
870                }
871
872                debug!("Uploading part {} ({} bytes)", part_number, bytes);
873                let result = self.client.upload_part(UploadPartRequest {
874                    body: Some(ByteStream::from(buf)),
875                    bucket: bucket_name.clone(),
876                    key: object_key.clone(),
877                    part_number: part_number as i64,
878                    upload_id: upload_id.clone(),
879                    .. Default::default()
880                }).with_timeout(self.data_timeout).sync()?;
881
882                completed_parts.push(CompletedPart {
883                    e_tag: result.e_tag,
884                    part_number: Some(part_number as i64),
885                });
886
887                progress.update(TransferStats {
888                    buffers: 1,
889                    bytes: bytes as u64,
890                    bytes_total: bytes as u64,
891                });
892
893                // Notify with progress
894                self.notify_upload_progress(&progress);
895            }
896
897            // No parts uploaded
898            if completed_parts.is_empty() {
899                return Err(S3SyncError::NoBodyError)
900            }
901
902            debug!("Multipart upload {:?} complete", upload_id);
903            self.client.complete_multipart_upload(CompleteMultipartUploadRequest {
904                bucket: bucket_name.clone(),
905                key: object_key.clone(),
906                upload_id: upload_id.clone(),
907                multipart_upload: Some(CompletedMultipartUpload {
908                    parts: Some(completed_parts)
909                }),
910                .. Default::default()
911            }).with_timeout(self.timeout).sync()?;
912
913            Ok(Present(bucket_key))
914        }();
915
916        if let Err(err) = &result {
917            let err = Problem::from_error_message(err).to_string();
918            error!("Aborting multipart upload {:?} due to error: {}", upload_id, err);
919            self.client.abort_multipart_upload(AbortMultipartUploadRequest {
920                bucket: bucket_name,
921                key: object_key,
922                upload_id,
923                .. Default::default()
924            }).with_timeout(self.timeout).sync().ok_or_log_warn();
925
926            // Notify it is has failed
927            self.notify_upload_progress(&progress.failed(err));
928        } else {
929            // Notify it is done
930            self.notify_upload_progress(&progress.done());
931        }
932
933        result
934    }
935
936    /// Deletes single object.
937    ///
938    /// Note: Delete call does not fail if object does not exist.
939    ///
940    /// To delete many objects use [S3::delete_objects()] witch uses bulk delete API.
941    pub fn delete_object<'b, 's: 'b>(
942        &'s self,
943        bucket_key: impl Into<BucketKey<'b>>
944    ) -> Result<Absent<BucketKey<'b>>, S3SyncError> {
945        let bucket_key = bucket_key.into();
946        debug!("Deleting object {:?} from S3 bucket {:?}",bucket_key.key, bucket_key.bucket.name);
947        let res = self.client.delete_object(DeleteObjectRequest {
948            bucket: bucket_key.bucket.name.clone(),
949            key: bucket_key.key.clone(),
950            .. Default::default()
951        }).with_timeout(self.timeout).sync()?;
952        trace!("Delete response: {:?}", res);
953
954        Ok(Absent(bucket_key))
955    }
956
957    /// Deletes list of objects in streaming fashion using bulk delete API.
958    ///
959    /// Warning: If returned iterator is not completely consumed not all items from the list may
960    /// be deleted.
961    ///
962    /// It is not an error to delete non-existing S3 object.
963    ///
964    /// Objects can live in different buckets but for best performance it is
965    /// recommended to order the list by bucket so that biggest batches can be crated.
966    ///
967    /// Each returned item represent batch delete call to S3 API.
968    ///
969    /// Successful batch call will return [Result::Ok] variant containing vector of results for each
970    /// individual object delete operation as provided by S3.
971    pub fn delete_objects<'b, 's: 'b>(
972        &'s self,
973        bucket_keys: impl IntoIterator<Item = impl Into<BucketKey<'b>>>
974    ) -> impl Iterator<Item = Result<Vec<Result<Absent<BucketKey<'b>>, (BucketKey<'b>, S3SyncError)>>, S3SyncError>> + Captures1<'s> + Captures2<'b> {
975        let max_delete_objects = self.max_delete_objects;
976        bucket_keys
977            .into_iter()
978            .map(|o| o.into())
979            .peekable()
980            .batching(move |bucket_keys| {
981                let current_bucket_name = if let Some(bucket_key) = bucket_keys.peek() {
982                    bucket_key.bucket.name.clone()
983                } else {
984                    return None
985                };
986
987                Some((current_bucket_name.clone(),
988                     bucket_keys
989                        .peeking_take_while(move |object| object.bucket.name == current_bucket_name)
990                        .take(max_delete_objects)
991                        .collect::<Vec<_>>()))
992            })
993            .map(move |(current_bucket_name, bucket_keys): (_, Vec<_>)| {
994                debug!("Deleting batch of {} objects from S3 bucket {:?}", bucket_keys.len(), current_bucket_name);
995                let res = self.client.delete_objects(DeleteObjectsRequest {
996                    bucket: current_bucket_name.clone(),
997                    delete: Delete {
998                        objects: bucket_keys.iter().map(|bucket_key| ObjectIdentifier {
999                            key: bucket_key.key.clone(),
1000                            .. Default::default()
1001                        }).collect::<Vec<_>>(),
1002                        .. Default::default()
1003                    }, .. Default::default()
1004                }).with_timeout(self.timeout).sync()?;
1005                trace!("Delete response: {:?}", res);
1006
1007                let ok_objects =
1008                if let Some(deleted) = res.deleted {
1009                    debug!("Deleted {} objects", deleted.len());
1010                    deleted.into_iter().map(|deleted| {
1011                        if let Some(key) = deleted.key {
1012                            Ok(key)
1013                        } else {
1014                            Err(S3SyncError::RusotoError(RusotoError::Validation("got S3 delete object errors but no key or message information".to_owned())))
1015                        }
1016                    }).collect::<Result<HashSet<_>, _>>()?
1017                } else {
1018                    Default::default()
1019                };
1020
1021                let mut failed_objects =
1022                if let Some(errors) = res.errors {
1023                    errors.into_iter().map(|error| {
1024                        error!("Error deleting S3 object {:?}: {}",
1025                            error.key.as_ref().map(|s| s.as_str()).unwrap_or("<None>"),
1026                            error.message.as_ref().map(|s| s.as_str()).unwrap_or("<None>"));
1027
1028                        // Try the best to get failed objects out of OK objects along with error
1029                        // message
1030                        if let (Some(key), Some(error)) = (error.key, error.message) {
1031                            Ok((key, S3SyncError::RusotoError(RusotoError::Validation(error))))
1032                        } else {
1033                            Err(S3SyncError::RusotoError(RusotoError::Validation("got S3 delete object errors but no key or message information".to_owned())))
1034                        }
1035                    }).collect::<Result<HashMap<_, _>, _>>()?
1036                } else {
1037                    Default::default()
1038                };
1039
1040                Ok(bucket_keys.into_iter().map(|k| {
1041                    if ok_objects.contains(&k.key) {
1042                        Ok(Absent(k))
1043                    } else if let Some(err) = failed_objects.remove(&k.key) {
1044                        Err((k, err))
1045                    } else {
1046                        Err((k, S3SyncError::RusotoError(RusotoError::Validation("S3 did not report this object as deleted or failed to be deleted".to_owned()))))
1047                    }
1048                }).collect::<Vec<_>>()) // Option<Result<Vec<Result<,>,>>
1049            })
1050    }
1051
1052    /// Returns [Ensure] value that can be used to ensure that object is present in the S3 bucket.
1053    ///
1054    /// If S3 object does not exist this method will call `body` function to obtain [Read] and metadata values,
1055    /// then data read will be uploaded to the new S3 object with given metadata set on it.
1056    ///
1057    /// Warning: There can be a race condition between check if object exists and the upload creating it.
1058    pub fn object_present<'b, 's: 'b, R, F>(
1059        &'s self,
1060        bucket_key: BucketKey<'b>,
1061        check_impl: CheckObjectImpl,
1062        body: F
1063    ) -> impl Ensure<Present<BucketKey<'b>>, EnsureAction = impl Meet<Met = Present<BucketKey<'b>>, Error = S3SyncError> + Captures1<'s> + Captures2<'b>> + Captures1<'s> + Captures2<'b>
1064    where R: Read + 's, F: FnOnce() -> Result<(R, ObjectBodyMeta), std::io::Error> + 's {
1065        move || {
1066            Ok(match self.check_object_exists(bucket_key, check_impl)? {
1067                Left(present) => Met(present.into()),
1068                Right(absent) => EnsureAction(move || {
1069                    let (body, meta) = body()?;
1070                    self.put_object(absent, body, meta)
1071                })
1072            })
1073        }
1074    }
1075
1076    /// Returns [Ensure] value that can be used to ensure that object is absent in the S3 bucket.
1077    ///
1078    /// Warning: There can be a race condition between check if object exists and the delete operation.
1079    pub fn object_absent<'b, 's: 'b>(
1080        &'s self,
1081        bucket_key: BucketKey<'b>,
1082        check_impl: CheckObjectImpl
1083    ) -> impl Ensure<Absent<BucketKey<'b>>, EnsureAction = impl Meet<Met = Absent<BucketKey<'b>>, Error = S3SyncError> + Captures1<'s> + Captures2<'b>> + Captures1<'s> + Captures2<'b> {
1084        move || {
1085            Ok(match self.check_object_exists(bucket_key, check_impl)? {
1086                Right(absent) => Met(absent),
1087                Left(present) => EnsureAction(move || {
1088                    self.delete_object(present)
1089                })
1090            })
1091        }
1092    }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097    use super::*;
1098    use assert_matches::assert_matches;
1099    use ensure::ExternalState;
1100    use std::io::Cursor;
1101
1102    fn s3_test_bucket() -> Bucket {
1103        Bucket::from_name(std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set"))
1104    }
1105
1106    fn test_key() -> String {
1107        use std::time::SystemTime;
1108        format!("s3-sync-test/foo-{}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_micros())
1109    }
1110
1111    #[test]
1112    fn test_get_body_bucket_key() {
1113        use std::io::Cursor;
1114
1115        let s3 = S3::default();
1116        let body = Cursor::new(b"hello world".to_vec());
1117
1118        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1119        let object = BucketKey::from_key(&bucket, test_key());
1120
1121        let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1122
1123        let mut body = Vec::new();
1124
1125        s3.get_body(&object).unwrap().read_to_end(&mut body).unwrap();
1126
1127        assert_eq!(&body, b"hello world");
1128    }
1129
1130    #[test]
1131    fn test_get_body_object() {
1132        use std::io::Cursor;
1133
1134        let s3 = S3::default();
1135        let body = Cursor::new(b"hello world".to_vec());
1136
1137        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1138        let object = BucketKey::from_key(&bucket, test_key());
1139
1140        let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1141
1142        let mut body = Vec::new();
1143
1144        let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1145        s3.get_body(&object).unwrap().read_to_end(&mut body).unwrap();
1146
1147        assert_eq!(&body, b"hello world");
1148    }
1149
1150    #[test]
1151    fn test_object_present() {
1152        use std::io::Cursor;
1153
1154        let s3 = S3::default();
1155        let body = Cursor::new(b"hello world".to_vec());
1156
1157        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1158        let object = BucketKey::from_key(&bucket, test_key());
1159
1160        let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1161
1162        assert!(s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().is_left());
1163    }
1164
1165    #[test]
1166    fn test_object_absent() {
1167        use std::io::Cursor;
1168
1169        let s3 = S3::default();
1170        let body = Cursor::new(b"hello world".to_vec());
1171
1172        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1173        let object = BucketKey::from_key(&bucket, test_key());
1174
1175        let object = s3.put_object(object, body, ObjectBodyMeta::default()).unwrap().invalidate_state();
1176
1177        let object = s3.object_absent(object, CheckObjectImpl::Head).ensure().or_failed_to("make object absent");
1178
1179        assert!(s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::List).unwrap().is_right());
1180    }
1181
1182    #[test]
1183    fn test_object_present_empty_read() {
1184        use std::io::Cursor;
1185
1186        let s3 = S3::default();
1187        let body = Cursor::new(b"".to_vec());
1188
1189        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1190        let object = BucketKey::from_key(&bucket, test_key());
1191
1192        assert_matches!(s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure(), Err(S3SyncError::NoBodyError));
1193    }
1194
1195    #[test]
1196    fn test_object_present_progress() {
1197        let mut s3 = S3::default();
1198        let body = Cursor::new(b"hello world".to_vec());
1199
1200        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1201        let object = BucketKey::from_key(&bucket, test_key());
1202
1203        let asserts: Vec<Box<dyn Fn(&TransferStatus)>> = vec![
1204            Box::new(|t| assert_matches!(t, TransferStatus::Init)),
1205            Box::new(|t| assert_matches!(t, TransferStatus::Progress(_))),
1206            Box::new(|t| assert_matches!(t, TransferStatus::Done(_))),
1207        ];
1208
1209        let mut asserts = asserts.into_iter();
1210
1211        s3.with_on_upload_progress(move |t| asserts.next().unwrap()(t), |s3| {
1212            s3.object_present(object, CheckObjectImpl::Head, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1213        });
1214    }
1215
1216    #[test]
1217    fn test_delete_objects_given() {
1218        let s3 = S3::default();
1219
1220        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1221
1222        let objects = vec![
1223            BucketKey::from_key(&bucket, "s3-sync-test/bar-1".to_owned()),
1224            BucketKey::from_key(&bucket, "s3-sync-test/bar-2".to_owned()),
1225        ];
1226
1227        for object in objects.clone() {
1228            s3.put_object(object, Cursor::new(b"foo bar".to_vec()), ObjectBodyMeta::default()).unwrap();
1229        }
1230
1231        let ops = s3.delete_objects(objects).or_failed_to("delete objects").collect::<Vec<_>>();
1232
1233        // one batch
1234        assert_eq!(ops.len(), 1);
1235        // two objects in the batch
1236        assert_eq!(ops[0].len(), 2);
1237
1238        assert_matches!(&ops[0][0], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1239                        assert_eq!(name, &s3_test_bucket().name);
1240                        assert_eq!(key, "s3-sync-test/bar-1")
1241        });
1242
1243        assert_matches!(&ops[0][1], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1244                        assert_eq!(name, &s3_test_bucket().name);
1245                        assert_eq!(key, "s3-sync-test/bar-2")
1246        });
1247    }
1248
1249    #[test]
1250    fn test_delete_objects_from_list() {
1251        let s3 = S3::default();
1252
1253        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1254
1255        let objects = vec![
1256            BucketKey::from_key(&bucket, "s3-sync-test/baz-1".to_owned()),
1257            BucketKey::from_key(&bucket, "s3-sync-test/baz-2".to_owned()),
1258            BucketKey::from_key(&bucket, "s3-sync-test/bax".to_owned()),
1259        ];
1260
1261        for object in objects {
1262            s3.put_object(object, Cursor::new(b"foo bar".to_vec()), ObjectBodyMeta::default()).unwrap();
1263        }
1264
1265        let objects = s3.list_objects(&bucket, "s3-sync-test/baz".to_owned()).or_failed_to("get list of objects");
1266        let ops = s3.delete_objects(objects).or_failed_to("delete objects").collect::<Vec<_>>();
1267
1268        // one batch
1269        assert_eq!(ops.len(), 1);
1270        // two objects in the batch
1271        assert_eq!(ops[0].len(), 2);
1272
1273        assert_matches!(&ops[0][0], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1274                        assert_eq!(name, &s3_test_bucket().name);
1275                        assert_eq!(key, "s3-sync-test/baz-1")
1276        });
1277
1278        assert_matches!(&ops[0][1], Ok(Absent(BucketKey { bucket: Present(Bucket { name }), key })) => {
1279                        assert_eq!(name, &s3_test_bucket().name);
1280                        assert_eq!(key, "s3-sync-test/baz-2")
1281        });
1282    }
1283
1284    #[test]
1285    fn test_object_last_modified() {
1286        use std::io::Cursor;
1287
1288        let s3 = S3::default();
1289        let body = Cursor::new(b"hello world".to_vec());
1290
1291        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1292        let object = BucketKey::from_key(&bucket, test_key());
1293
1294        let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1295
1296        let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1297        matches!(object.last_modified(), LastModified::Rfc2822(_));
1298
1299        let object = s3.check_object_exists(object.unwrap_bucket_key().invalidate_state(), CheckObjectImpl::List).unwrap().unwrap_left();
1300        matches!(object.last_modified(), LastModified::Rfc3339(_));
1301    }
1302
1303    #[cfg(feature = "chrono")]
1304    #[test]
1305    fn test_object_last_modified_chrono() {
1306        use std::io::Cursor;
1307
1308        let s3 = S3::default();
1309        let body = Cursor::new(b"hello world".to_vec());
1310
1311        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1312        let object = BucketKey::from_key(&bucket, test_key());
1313
1314        let object = s3.object_present(object, CheckObjectImpl::List, move || Ok((body, ObjectBodyMeta::default()))).ensure().or_failed_to("make object present");
1315
1316        let object = s3.check_object_exists(object.invalidate_state(), CheckObjectImpl::Head).unwrap().unwrap_left();
1317        assert!(object.last_modified().parse().is_ok());
1318
1319        let object = s3.check_object_exists(object.unwrap_bucket_key().invalidate_state(), CheckObjectImpl::List).unwrap().unwrap_left();
1320        assert!(object.last_modified().parse().is_ok());
1321    }
1322
1323    #[test]
1324    fn test_bucket_lifetimes() {
1325        // TODO: rewrite using type impl Trait alias to separate input lifetimes from return type when stable
1326        //       see: https://gist.github.com/jpastuszek/41fb5073dec33cae431ef25f8b576ac1
1327        // fn compiles_generic1<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1328        //     let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1329        //     let bucket_key = BucketKey::from_key(&bucket, key);
1330
1331        //     let present_bucket_key = bucket_key.assume_present();
1332        //     let input = s3.get_body(&present_bucket_key).unwrap();
1333
1334        //     input // return input as borrowing from 's
1335        // }
1336
1337        // fn compiles_generic2<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1338        //     let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1339        //     let bucket_key = BucketKey::from_key(&bucket, key);
1340
1341        //     let object = s3.check_object_exists(bucket_key, CheckObjectImpl::Head).unwrap().left().unwrap();
1342        //     let input = s3.get_body(&object).unwrap();
1343
1344        //     input // return input as borrowing from 's
1345        // }
1346
1347        fn compiles1<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1348            let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1349            let bucket_key = BucketKey::from_key(&bucket, key);
1350
1351            let present_bucket_key = bucket_key.assume_present();
1352            let input = s3.get_body_box(&present_bucket_key).unwrap();
1353
1354            input // return input as borrowing from 's
1355        }
1356
1357        fn compiles2<'s>(s3: &'s S3, bucket: String, key: String) -> impl Read + 's {
1358            let bucket = s3.check_bucket_exists(Bucket::from_name(bucket)).unwrap().left().unwrap();
1359            let bucket_key = BucketKey::from_key(&bucket, key);
1360
1361            let object = s3.check_object_exists(bucket_key, CheckObjectImpl::Head).unwrap().left().unwrap();
1362            let input = s3.get_body_box(&object).unwrap();
1363
1364            input // return input as borrowing from 's
1365        }
1366
1367        let s3 = S3::default();
1368        let body = Cursor::new(b"hello world".to_vec());
1369
1370        let bucket = s3.check_bucket_exists(s3_test_bucket()).or_failed_to("check if bucket exists").left().expect("bucket does not exist");
1371        let key = test_key();
1372        let object = BucketKey::from_key(&bucket, key.clone());
1373
1374        s3.put_object(object, body, ObjectBodyMeta::default()).unwrap();
1375
1376        let mut body = Vec::new();
1377        compiles1(&s3, s3_test_bucket().name().to_owned(), key.clone()).read_to_end(&mut body).unwrap();
1378        assert_eq!(&body, b"hello world");
1379
1380        let mut body = Vec::new();
1381        compiles2(&s3, s3_test_bucket().name().to_owned(), key.clone()).read_to_end(&mut body).unwrap();
1382        assert_eq!(&body, b"hello world");
1383    }
1384}