s3_batch_put_tar/
lib.rs

1use log::*;
2use rusoto_core::RusotoError;
3
4use serde_derive::Serialize;
5use std::io;
6use std::io::{Read, Seek};
7use std::sync::mpsc::TrySendError;
8
9struct Batch<W: io::Write + io::Seek> {
10    start_time_ms: u64,
11    output: tar::Builder<W>,
12}
13impl<W: io::Write + io::Seek> Batch<W> {
14    pub fn new(start_time_ms: u64, output: W) -> Batch<W> {
15        Batch {
16            start_time_ms,
17            output: tar::Builder::new(output),
18        }
19    }
20    pub fn put_object(&mut self, write: QueuedWrite) -> io::Result<()> {
21        let pos = self
22            .output
23            .get_mut()
24            .seek(io::SeekFrom::Current(0))
25            .unwrap();
26        assert_eq!(pos, write.put_ref.offset_bytes as u64);
27        let mut header = tar::Header::new_old();
28        let h = &mut header;
29        h.set_path(write.input.name).expect("set_path()");
30        h.set_mtime(write.write_time.as_secs());
31        h.set_size(write.input.body.len() as _);
32        // its nice if the extracted files are readable + writable
33        h.set_mode(0b0110000000);
34        // after filling header fields; calculate header checksum
35        h.set_cksum();
36        self.output.append(&header, &write.input.body[..])?;
37        let pos = self
38            .output
39            .get_mut()
40            .seek(io::SeekFrom::Current(0))
41            .unwrap();
42        assert!(pos >= (write.put_ref.offset_bytes + write.put_ref.size_bytes) as u64);
43        Ok(())
44    }
45
46    pub fn into_inner(self) -> io::Result<W> {
47        self.output.into_inner()
48    }
49}
50
51#[derive(Debug)]
52pub enum BatchPutObjectError {
53    NameTooLong {
54        length: usize,
55    },
56    /// No more put-object operations could be submitted because the submission queue is already
57    /// full.
58    QueueFull,
59    /// No more put-object operations could be submitted because the object writer has been shut
60    /// down.
61    WriterClosed,
62    /// The body given in the BatchPutObjectRequest must produce Some(usize) from its size_hint()
63    /// method so that the correct space can be allocated for the object within the batch without
64    /// having to read the whole stream into memory.
65    UnsizedBody,
66}
67
68struct BatchProcessor<Client: rusoto_s3::S3> {
69    rx: std::sync::mpsc::Receiver<QueuedWrite>,
70    batch_duration_ms: u128,
71    client: Client,
72    /// Name of the target S3 bucket
73    bucket: String,
74    key_prefix: String,
75    rt: tokio::runtime::Handle,
76    storage_class: Option<String>,
77}
78impl<Client: rusoto_s3::S3> BatchProcessor<Client> {
79    pub fn new(
80        rx: std::sync::mpsc::Receiver<QueuedWrite>,
81        batch_duration_ms: u128,
82        client: Client,
83        bucket: String,
84        key_prefix: String,
85        rt: tokio::runtime::Handle,
86        storage_class: Option<String>,
87    ) -> Self {
88        BatchProcessor {
89            rx,
90            batch_duration_ms,
91            client,
92            bucket,
93            rt,
94            key_prefix,
95            storage_class,
96        }
97    }
98    pub fn process(self) {
99        self.rt.block_on(self.process_async())
100    }
101    async fn process_async(&self) {
102        let mut current_batch = None;
103        let timeout = std::time::Duration::from_millis(self.batch_duration_ms as _);
104        loop {
105            match self.rx.recv_timeout(timeout) {
106                Ok(req) => {
107                    if current_batch.is_none() {
108                        let tmp = match tempfile::tempfile() {
109                            Ok(t) => t,
110                            Err(e) => {
111                                log::error!("tempfile() failed: {:?}.  BatchProcessor exiting", e);
112                                break;
113                            }
114                        };
115                        let batch_start = self.batch_start_ms(req.write_time);
116                        current_batch = Some(Batch::new(batch_start, tmp));
117                    } else {
118                        let current_start = current_batch.as_ref().unwrap().start_time_ms;
119                        let next_start = req.put_ref.batch_id;
120                        if current_start != next_start {
121                            let last = current_batch.take().unwrap();
122                            if let Err(e) = self.finalise(last).await {
123                                error!("finalise() failed: {:?}", e);
124                            }
125                            let tmp = match tempfile::tempfile() {
126                                Ok(t) => t,
127                                Err(e) => {
128                                    error!("tempfile() failed: {:?}.  BatchProcessor exiting", e);
129                                    break;
130                                }
131                            };
132                            current_batch = Some(Batch::new(next_start, tmp));
133                        }
134                    };
135                    let mut batch = current_batch.take().unwrap();
136                    let (res, batch) = self
137                        .rt
138                        .spawn_blocking(move || (batch.put_object(req), batch))
139                        .await
140                        .unwrap();
141                    if let Err(e) = res {
142                        error!("failure to write batch-object: {:?}", e);
143                        // presume this batch is in some way compromised (e.g. out of disk space)
144                        // so we free it tp prevent any further attempts to write, in hopes that
145                        // any batches crated later may fare better once the underlying problem is
146                        // resolved
147                    } else {
148                        current_batch = Some(batch);
149                    }
150                }
151                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
152                    if current_batch.is_some() {
153                        let now = std::time::SystemTime::now();
154                        let utc = now
155                            .duration_since(std::time::UNIX_EPOCH)
156                            .expect("duration_since()");
157                        let batch_start = self.batch_start_ms(utc);
158                        if batch_start > current_batch.as_ref().unwrap().start_time_ms {
159                            let last = current_batch.take().unwrap();
160                            if let Err(e) = self.finalise(last).await {
161                                error!("finalise() failed: {:?}", e);
162                            }
163                        }
164                    }
165                }
166                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
167            }
168        }
169    }
170
171    async fn finalise(&self, batch: Batch<std::fs::File>) -> io::Result<()> {
172        let start_time_ms = batch.start_time_ms;
173        let mut file = batch.into_inner()?;
174        let mut attempt = 0;
175        const MAX_ATTEMPTS: u32 = 3;
176        loop {
177            attempt += 1;
178            file.seek(std::io::SeekFrom::Start(0))?;
179            let mut all = vec![];
180            file.read_to_end(&mut all)?; // FIXME: stream the file rather than loading into memory - https://stackoverflow.com/questions/57810173/streamed-upload-to-s3-with-rusoto
181            let req = rusoto_s3::PutObjectRequest {
182                body: Some(rusoto_s3::StreamingBody::from(all)),
183                bucket: self.bucket.clone(),
184                key: format!("{}{:}.tar", self.key_prefix, start_time_ms),
185                storage_class: self.storage_class.clone(),
186                ..Default::default()
187            };
188            match self.client.put_object(req).await {
189                Ok(_output) => return Ok(()),
190                Err(e) => {
191                    if attempt <= MAX_ATTEMPTS {
192                        if let RusotoError::HttpDispatch(err) = e {
193                            error!("put_object() failed on attempt {}: {:?}", attempt, err);
194                        }
195                    } else {
196                        return Err(io::Error::new(io::ErrorKind::Other, e));
197                    }
198                }
199            }
200            // wait for 100ms, 200ms
201            tokio::time::delay_for(std::time::Duration::from_millis(100 * (2_u64.pow(attempt))))
202                .await;
203        }
204    }
205
206    fn batch_start_ms(&self, utc: std::time::Duration) -> u64 {
207        (utc.as_millis() - (utc.as_millis() % self.batch_duration_ms)) as u64
208    }
209}
210
211struct QueuedWrite {
212    // This object tracks the BatchObjectRef we handed back to the caller when we accepted this
213    // in order that when we really come to insert the item into the tar file we can assert that
214    // the *actual* file layout matches the byte-offset values we had predicted
215    put_ref: BatchObjectRef,
216    input: BatchPutObjectRequest,
217    write_time: std::time::Duration,
218}
219
220struct Coalesce {
221    last_batch_id: Option<u64>,
222    batch_duration_ms: u64,
223    next_write_offset_bytes: usize,
224    tx: std::sync::mpsc::SyncSender<QueuedWrite>,
225}
226impl Coalesce {
227    const TAR_BLOCK_SIZE: usize = 512;
228
229    pub fn new(batch_duration_ms: u64, tx: std::sync::mpsc::SyncSender<QueuedWrite>) -> Self {
230        Coalesce {
231            last_batch_id: None,
232            batch_duration_ms,
233            next_write_offset_bytes: 0,
234            tx,
235        }
236    }
237    pub fn put_object(
238        &mut self,
239        input: BatchPutObjectRequest,
240    ) -> Result<BatchObjectRef, BatchPutObjectError> {
241        let now = std::time::SystemTime::now();
242        let utc = now
243            .duration_since(std::time::UNIX_EPOCH)
244            .expect("duration_since()");
245        let this_batch_id = self.batch_start_ms(utc);
246
247        if self.last_batch_id.is_none() {
248            self.last_batch_id = Some(this_batch_id);
249        } else if *self.last_batch_id.as_ref().unwrap() < this_batch_id {
250            self.next_write_offset_bytes = 0;
251            self.last_batch_id = Some(this_batch_id);
252        }
253        let size = input.body.len();
254        let put_ref = BatchObjectRef {
255            batch_id: *self.last_batch_id.as_ref().unwrap(),
256            offset_bytes: self.next_write_offset_bytes,
257            size_bytes: size,
258        };
259        let tar_space_required = Self::TAR_BLOCK_SIZE
260            + size
261            + if size % Self::TAR_BLOCK_SIZE == 0 {
262                0
263            } else {
264                Self::TAR_BLOCK_SIZE - size % Self::TAR_BLOCK_SIZE
265            };
266        self.next_write_offset_bytes += tar_space_required;
267
268        let write = QueuedWrite {
269            put_ref: put_ref.clone(),
270            write_time: utc,
271            input,
272        };
273        match self.tx.try_send(write) {
274            Ok(_) => Ok(put_ref),
275            Err(e) => {
276                // undo our update to the shared state made above, since we didn't write the data
277                self.next_write_offset_bytes -= tar_space_required;
278                Err(match e {
279                    TrySendError::Full(_) => BatchPutObjectError::QueueFull,
280                    TrySendError::Disconnected(_) => BatchPutObjectError::WriterClosed,
281                })
282            }
283        }
284    }
285
286    fn batch_start_ms(&self, utc: std::time::Duration) -> u64 {
287        let utc = utc.as_millis() as u64;
288        utc - utc % self.batch_duration_ms
289    }
290}
291
292pub struct S3BatchPutClient {
293    inner: std::sync::Arc<std::sync::Mutex<Coalesce>>,
294}
295impl S3BatchPutClient {
296    /// Queue the given object to be written into a batch.
297    ///
298    /// Batches are written do disk before being uploaded to S3.  The writing to disk and eventual
299    /// upload to S3 will be performed on another thread, therefore this method does not block the
300    /// caller or need to be `await`ed.
301    pub fn put_object(
302        &mut self,
303        input: BatchPutObjectRequest,
304    ) -> Result<BatchObjectRef, BatchPutObjectError> {
305        self.inner.lock().unwrap().put_object(input)
306    }
307}
308
309pub struct BatchPutObjectRequest {
310    pub name: String,
311    pub body: Vec<u8>,
312}
313
314#[derive(Serialize, Debug, Clone)]
315pub struct BatchObjectRef {
316    pub batch_id: u64,
317    pub offset_bytes: usize,
318    pub size_bytes: usize,
319}
320
321pub struct ClientBuilder<Client: rusoto_s3::S3> {
322    batch_duration: Option<u64>,
323    bucket: Option<String>,
324    key_prefix: Option<String>,
325    client: Option<Client>,
326    rt: Option<tokio::runtime::Handle>,
327    storage_class: Option<String>,
328}
329impl<Client: rusoto_s3::S3> Default for ClientBuilder<Client> {
330    fn default() -> Self {
331        ClientBuilder {
332            batch_duration: None,
333            bucket: None,
334            client: None,
335            key_prefix: None,
336            rt: None,
337            storage_class: None,
338        }
339    }
340}
341impl<Client: rusoto_s3::S3 + Send + 'static> ClientBuilder<Client> {
342    pub fn batch_duration(mut self, batch_duration: u64) -> Self {
343        self.batch_duration = Some(batch_duration);
344        self
345    }
346
347    pub fn bucket<S: ToString>(mut self, name: S) -> Self {
348        self.bucket = Some(name.to_string());
349        self
350    }
351
352    pub fn key_prefix<S: ToString>(mut self, prefix: S) -> Self {
353        self.key_prefix = Some(prefix.to_string());
354        self
355    }
356
357    pub fn s3_client(mut self, s3_client: Client) -> Self {
358        self.client = Some(s3_client);
359        self
360    }
361
362    pub fn handle(mut self, rt: tokio::runtime::Handle) -> Self {
363        self.rt = Some(rt);
364        self
365    }
366
367    /// Optional S3 storage class to request when storing resulting tar files
368    pub fn storage_class(mut self, storage_class: String) -> Self {
369        self.storage_class = Some(storage_class);
370        self
371    }
372
373    pub fn build(self) -> Result<S3BatchPutClient, BuildError> {
374        let (tx, rx) = std::sync::mpsc::sync_channel(30);
375        let proc = BatchProcessor::new(
376            rx,
377            self.batch_duration
378                .ok_or(BuildError::MissingBatchDuration)? as _,
379            self.client.ok_or(BuildError::MissingS3Client)?,
380            self.bucket.ok_or(BuildError::MissingBucket)?,
381            self.key_prefix.unwrap_or_else(|| "".to_string()),
382            self.rt.ok_or(BuildError::MissingRuntime)?,
383            self.storage_class,
384        );
385        std::thread::spawn(|| proc.process());
386        let client = S3BatchPutClient {
387            inner: std::sync::Arc::new(std::sync::Mutex::new(Coalesce::new(
388                self.batch_duration
389                    .ok_or(BuildError::MissingBatchDuration)? as _,
390                tx,
391            ))),
392        };
393        Ok(client)
394    }
395}
396
397#[derive(Debug)]
398pub enum BuildError {
399    MissingBucket,
400    MissingBatchDuration,
401    MissingS3Client,
402    MissingRuntime,
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use async_trait::async_trait;
409    use rusoto_core::RusotoError;
410    use rusoto_s3::{
411        AbortMultipartUploadError, AbortMultipartUploadOutput, AbortMultipartUploadRequest,
412        CompleteMultipartUploadError, CompleteMultipartUploadOutput,
413        CompleteMultipartUploadRequest, CopyObjectError, CopyObjectOutput, CopyObjectRequest,
414        CreateBucketError, CreateBucketOutput, CreateBucketRequest, CreateMultipartUploadError,
415        CreateMultipartUploadOutput, CreateMultipartUploadRequest,
416        DeleteBucketAnalyticsConfigurationError, DeleteBucketAnalyticsConfigurationRequest,
417        DeleteBucketCorsError, DeleteBucketCorsRequest, DeleteBucketEncryptionError,
418        DeleteBucketEncryptionRequest, DeleteBucketError, DeleteBucketInventoryConfigurationError,
419        DeleteBucketInventoryConfigurationRequest, DeleteBucketLifecycleError,
420        DeleteBucketLifecycleRequest, DeleteBucketMetricsConfigurationError,
421        DeleteBucketMetricsConfigurationRequest, DeleteBucketPolicyError,
422        DeleteBucketPolicyRequest, DeleteBucketReplicationError, DeleteBucketReplicationRequest,
423        DeleteBucketRequest, DeleteBucketTaggingError, DeleteBucketTaggingRequest,
424        DeleteBucketWebsiteError, DeleteBucketWebsiteRequest, DeleteObjectError,
425        DeleteObjectOutput, DeleteObjectRequest, DeleteObjectTaggingError,
426        DeleteObjectTaggingOutput, DeleteObjectTaggingRequest, DeleteObjectsError,
427        DeleteObjectsOutput, DeleteObjectsRequest, DeletePublicAccessBlockError,
428        DeletePublicAccessBlockRequest, GetBucketAccelerateConfigurationError,
429        GetBucketAccelerateConfigurationOutput, GetBucketAccelerateConfigurationRequest,
430        GetBucketAclError, GetBucketAclOutput, GetBucketAclRequest,
431        GetBucketAnalyticsConfigurationError, GetBucketAnalyticsConfigurationOutput,
432        GetBucketAnalyticsConfigurationRequest, GetBucketCorsError, GetBucketCorsOutput,
433        GetBucketCorsRequest, GetBucketEncryptionError, GetBucketEncryptionOutput,
434        GetBucketEncryptionRequest, GetBucketInventoryConfigurationError,
435        GetBucketInventoryConfigurationOutput, GetBucketInventoryConfigurationRequest,
436        GetBucketLifecycleConfigurationError, GetBucketLifecycleConfigurationOutput,
437        GetBucketLifecycleConfigurationRequest, GetBucketLifecycleError, GetBucketLifecycleOutput,
438        GetBucketLifecycleRequest, GetBucketLocationError, GetBucketLocationOutput,
439        GetBucketLocationRequest, GetBucketLoggingError, GetBucketLoggingOutput,
440        GetBucketLoggingRequest, GetBucketMetricsConfigurationError,
441        GetBucketMetricsConfigurationOutput, GetBucketMetricsConfigurationRequest,
442        GetBucketNotificationConfigurationError, GetBucketNotificationConfigurationRequest,
443        GetBucketNotificationError, GetBucketPolicyError, GetBucketPolicyOutput,
444        GetBucketPolicyRequest, GetBucketPolicyStatusError, GetBucketPolicyStatusOutput,
445        GetBucketPolicyStatusRequest, GetBucketReplicationError, GetBucketReplicationOutput,
446        GetBucketReplicationRequest, GetBucketRequestPaymentError, GetBucketRequestPaymentOutput,
447        GetBucketRequestPaymentRequest, GetBucketTaggingError, GetBucketTaggingOutput,
448        GetBucketTaggingRequest, GetBucketVersioningError, GetBucketVersioningOutput,
449        GetBucketVersioningRequest, GetBucketWebsiteError, GetBucketWebsiteOutput,
450        GetBucketWebsiteRequest, GetObjectAclError, GetObjectAclOutput, GetObjectAclRequest,
451        GetObjectError, GetObjectLegalHoldError, GetObjectLegalHoldOutput,
452        GetObjectLegalHoldRequest, GetObjectLockConfigurationError,
453        GetObjectLockConfigurationOutput, GetObjectLockConfigurationRequest, GetObjectOutput,
454        GetObjectRequest, GetObjectRetentionError, GetObjectRetentionOutput,
455        GetObjectRetentionRequest, GetObjectTaggingError, GetObjectTaggingOutput,
456        GetObjectTaggingRequest, GetObjectTorrentError, GetObjectTorrentOutput,
457        GetObjectTorrentRequest, GetPublicAccessBlockError, GetPublicAccessBlockOutput,
458        GetPublicAccessBlockRequest, HeadBucketError, HeadBucketRequest, HeadObjectError,
459        HeadObjectOutput, HeadObjectRequest, ListBucketAnalyticsConfigurationsError,
460        ListBucketAnalyticsConfigurationsOutput, ListBucketAnalyticsConfigurationsRequest,
461        ListBucketInventoryConfigurationsError, ListBucketInventoryConfigurationsOutput,
462        ListBucketInventoryConfigurationsRequest, ListBucketMetricsConfigurationsError,
463        ListBucketMetricsConfigurationsOutput, ListBucketMetricsConfigurationsRequest,
464        ListBucketsError, ListBucketsOutput, ListMultipartUploadsError, ListMultipartUploadsOutput,
465        ListMultipartUploadsRequest, ListObjectVersionsError, ListObjectVersionsOutput,
466        ListObjectVersionsRequest, ListObjectsError, ListObjectsOutput, ListObjectsRequest,
467        ListObjectsV2Error, ListObjectsV2Output, ListObjectsV2Request, ListPartsError,
468        ListPartsOutput, ListPartsRequest, NotificationConfiguration,
469        NotificationConfigurationDeprecated, PutBucketAccelerateConfigurationError,
470        PutBucketAccelerateConfigurationRequest, PutBucketAclError, PutBucketAclRequest,
471        PutBucketAnalyticsConfigurationError, PutBucketAnalyticsConfigurationRequest,
472        PutBucketCorsError, PutBucketCorsRequest, PutBucketEncryptionError,
473        PutBucketEncryptionRequest, PutBucketInventoryConfigurationError,
474        PutBucketInventoryConfigurationRequest, PutBucketLifecycleConfigurationError,
475        PutBucketLifecycleConfigurationRequest, PutBucketLifecycleError, PutBucketLifecycleRequest,
476        PutBucketLoggingError, PutBucketLoggingRequest, PutBucketMetricsConfigurationError,
477        PutBucketMetricsConfigurationRequest, PutBucketNotificationConfigurationError,
478        PutBucketNotificationConfigurationRequest, PutBucketNotificationError,
479        PutBucketNotificationRequest, PutBucketPolicyError, PutBucketPolicyRequest,
480        PutBucketReplicationError, PutBucketReplicationRequest, PutBucketRequestPaymentError,
481        PutBucketRequestPaymentRequest, PutBucketTaggingError, PutBucketTaggingRequest,
482        PutBucketVersioningError, PutBucketVersioningRequest, PutBucketWebsiteError,
483        PutBucketWebsiteRequest, PutObjectAclError, PutObjectAclOutput, PutObjectAclRequest,
484        PutObjectError, PutObjectLegalHoldError, PutObjectLegalHoldOutput,
485        PutObjectLegalHoldRequest, PutObjectLockConfigurationError,
486        PutObjectLockConfigurationOutput, PutObjectLockConfigurationRequest, PutObjectOutput,
487        PutObjectRequest, PutObjectRetentionError, PutObjectRetentionOutput,
488        PutObjectRetentionRequest, PutObjectTaggingError, PutObjectTaggingOutput,
489        PutObjectTaggingRequest, PutPublicAccessBlockError, PutPublicAccessBlockRequest,
490        RestoreObjectError, RestoreObjectOutput, RestoreObjectRequest, SelectObjectContentError,
491        SelectObjectContentOutput, SelectObjectContentRequest, UploadPartCopyError,
492        UploadPartCopyOutput, UploadPartCopyRequest, UploadPartError, UploadPartOutput,
493        UploadPartRequest,
494    };
495
496    #[derive(Default)]
497    struct Mock {
498        puts: std::sync::Arc<std::sync::Mutex<std::cell::RefCell<Vec<PutObjectRequest>>>>,
499    }
500    #[async_trait]
501    impl rusoto_s3::S3 for Mock {
502        async fn abort_multipart_upload(
503            &self,
504            _input: AbortMultipartUploadRequest,
505        ) -> Result<AbortMultipartUploadOutput, RusotoError<AbortMultipartUploadError>> {
506            unimplemented!()
507        }
508
509        async fn complete_multipart_upload(
510            &self,
511            _input: CompleteMultipartUploadRequest,
512        ) -> Result<CompleteMultipartUploadOutput, RusotoError<CompleteMultipartUploadError>>
513        {
514            unimplemented!()
515        }
516
517        async fn copy_object(
518            &self,
519            _input: CopyObjectRequest,
520        ) -> Result<CopyObjectOutput, RusotoError<CopyObjectError>> {
521            unimplemented!()
522        }
523
524        async fn create_bucket(
525            &self,
526            _input: CreateBucketRequest,
527        ) -> Result<CreateBucketOutput, RusotoError<CreateBucketError>> {
528            unimplemented!()
529        }
530
531        async fn create_multipart_upload(
532            &self,
533            _input: CreateMultipartUploadRequest,
534        ) -> Result<CreateMultipartUploadOutput, RusotoError<CreateMultipartUploadError>> {
535            unimplemented!()
536        }
537
538        async fn delete_bucket(
539            &self,
540            _input: DeleteBucketRequest,
541        ) -> Result<(), RusotoError<DeleteBucketError>> {
542            unimplemented!()
543        }
544
545        async fn delete_bucket_analytics_configuration(
546            &self,
547            _input: DeleteBucketAnalyticsConfigurationRequest,
548        ) -> Result<(), RusotoError<DeleteBucketAnalyticsConfigurationError>> {
549            unimplemented!()
550        }
551
552        async fn delete_bucket_cors(
553            &self,
554            _input: DeleteBucketCorsRequest,
555        ) -> Result<(), RusotoError<DeleteBucketCorsError>> {
556            unimplemented!()
557        }
558
559        async fn delete_bucket_encryption(
560            &self,
561            _input: DeleteBucketEncryptionRequest,
562        ) -> Result<(), RusotoError<DeleteBucketEncryptionError>> {
563            unimplemented!()
564        }
565
566        async fn delete_bucket_inventory_configuration(
567            &self,
568            _input: DeleteBucketInventoryConfigurationRequest,
569        ) -> Result<(), RusotoError<DeleteBucketInventoryConfigurationError>> {
570            unimplemented!()
571        }
572
573        async fn delete_bucket_lifecycle(
574            &self,
575            _input: DeleteBucketLifecycleRequest,
576        ) -> Result<(), RusotoError<DeleteBucketLifecycleError>> {
577            unimplemented!()
578        }
579
580        async fn delete_bucket_metrics_configuration(
581            &self,
582            _input: DeleteBucketMetricsConfigurationRequest,
583        ) -> Result<(), RusotoError<DeleteBucketMetricsConfigurationError>> {
584            unimplemented!()
585        }
586
587        async fn delete_bucket_policy(
588            &self,
589            _input: DeleteBucketPolicyRequest,
590        ) -> Result<(), RusotoError<DeleteBucketPolicyError>> {
591            unimplemented!()
592        }
593
594        async fn delete_bucket_replication(
595            &self,
596            _input: DeleteBucketReplicationRequest,
597        ) -> Result<(), RusotoError<DeleteBucketReplicationError>> {
598            unimplemented!()
599        }
600
601        async fn delete_bucket_tagging(
602            &self,
603            _input: DeleteBucketTaggingRequest,
604        ) -> Result<(), RusotoError<DeleteBucketTaggingError>> {
605            unimplemented!()
606        }
607
608        async fn delete_bucket_website(
609            &self,
610            _input: DeleteBucketWebsiteRequest,
611        ) -> Result<(), RusotoError<DeleteBucketWebsiteError>> {
612            unimplemented!()
613        }
614
615        async fn delete_object(
616            &self,
617            _input: DeleteObjectRequest,
618        ) -> Result<DeleteObjectOutput, RusotoError<DeleteObjectError>> {
619            unimplemented!()
620        }
621
622        async fn delete_object_tagging(
623            &self,
624            _input: DeleteObjectTaggingRequest,
625        ) -> Result<DeleteObjectTaggingOutput, RusotoError<DeleteObjectTaggingError>> {
626            unimplemented!()
627        }
628
629        async fn delete_objects(
630            &self,
631            _input: DeleteObjectsRequest,
632        ) -> Result<DeleteObjectsOutput, RusotoError<DeleteObjectsError>> {
633            unimplemented!()
634        }
635
636        async fn delete_public_access_block(
637            &self,
638            _input: DeletePublicAccessBlockRequest,
639        ) -> Result<(), RusotoError<DeletePublicAccessBlockError>> {
640            unimplemented!()
641        }
642
643        async fn get_bucket_accelerate_configuration(
644            &self,
645            _input: GetBucketAccelerateConfigurationRequest,
646        ) -> Result<
647            GetBucketAccelerateConfigurationOutput,
648            RusotoError<GetBucketAccelerateConfigurationError>,
649        > {
650            unimplemented!()
651        }
652
653        async fn get_bucket_acl(
654            &self,
655            _input: GetBucketAclRequest,
656        ) -> Result<GetBucketAclOutput, RusotoError<GetBucketAclError>> {
657            unimplemented!()
658        }
659
660        async fn get_bucket_analytics_configuration(
661            &self,
662            _input: GetBucketAnalyticsConfigurationRequest,
663        ) -> Result<
664            GetBucketAnalyticsConfigurationOutput,
665            RusotoError<GetBucketAnalyticsConfigurationError>,
666        > {
667            unimplemented!()
668        }
669
670        async fn get_bucket_cors(
671            &self,
672            _input: GetBucketCorsRequest,
673        ) -> Result<GetBucketCorsOutput, RusotoError<GetBucketCorsError>> {
674            unimplemented!()
675        }
676
677        async fn get_bucket_encryption(
678            &self,
679            _input: GetBucketEncryptionRequest,
680        ) -> Result<GetBucketEncryptionOutput, RusotoError<GetBucketEncryptionError>> {
681            unimplemented!()
682        }
683
684        async fn get_bucket_inventory_configuration(
685            &self,
686            _input: GetBucketInventoryConfigurationRequest,
687        ) -> Result<
688            GetBucketInventoryConfigurationOutput,
689            RusotoError<GetBucketInventoryConfigurationError>,
690        > {
691            unimplemented!()
692        }
693
694        async fn get_bucket_lifecycle(
695            &self,
696            _input: GetBucketLifecycleRequest,
697        ) -> Result<GetBucketLifecycleOutput, RusotoError<GetBucketLifecycleError>> {
698            unimplemented!()
699        }
700
701        async fn get_bucket_lifecycle_configuration(
702            &self,
703            _input: GetBucketLifecycleConfigurationRequest,
704        ) -> Result<
705            GetBucketLifecycleConfigurationOutput,
706            RusotoError<GetBucketLifecycleConfigurationError>,
707        > {
708            unimplemented!()
709        }
710
711        async fn get_bucket_location(
712            &self,
713            _input: GetBucketLocationRequest,
714        ) -> Result<GetBucketLocationOutput, RusotoError<GetBucketLocationError>> {
715            unimplemented!()
716        }
717
718        async fn get_bucket_logging(
719            &self,
720            _input: GetBucketLoggingRequest,
721        ) -> Result<GetBucketLoggingOutput, RusotoError<GetBucketLoggingError>> {
722            unimplemented!()
723        }
724
725        async fn get_bucket_metrics_configuration(
726            &self,
727            _input: GetBucketMetricsConfigurationRequest,
728        ) -> Result<
729            GetBucketMetricsConfigurationOutput,
730            RusotoError<GetBucketMetricsConfigurationError>,
731        > {
732            unimplemented!()
733        }
734
735        async fn get_bucket_notification(
736            &self,
737            _input: GetBucketNotificationConfigurationRequest,
738        ) -> Result<NotificationConfigurationDeprecated, RusotoError<GetBucketNotificationError>>
739        {
740            unimplemented!()
741        }
742
743        async fn get_bucket_notification_configuration(
744            &self,
745            _input: GetBucketNotificationConfigurationRequest,
746        ) -> Result<NotificationConfiguration, RusotoError<GetBucketNotificationConfigurationError>>
747        {
748            unimplemented!()
749        }
750
751        async fn get_bucket_policy(
752            &self,
753            _input: GetBucketPolicyRequest,
754        ) -> Result<GetBucketPolicyOutput, RusotoError<GetBucketPolicyError>> {
755            unimplemented!()
756        }
757
758        async fn get_bucket_policy_status(
759            &self,
760            _input: GetBucketPolicyStatusRequest,
761        ) -> Result<GetBucketPolicyStatusOutput, RusotoError<GetBucketPolicyStatusError>> {
762            unimplemented!()
763        }
764
765        async fn get_bucket_replication(
766            &self,
767            _input: GetBucketReplicationRequest,
768        ) -> Result<GetBucketReplicationOutput, RusotoError<GetBucketReplicationError>> {
769            unimplemented!()
770        }
771
772        async fn get_bucket_request_payment(
773            &self,
774            _input: GetBucketRequestPaymentRequest,
775        ) -> Result<GetBucketRequestPaymentOutput, RusotoError<GetBucketRequestPaymentError>>
776        {
777            unimplemented!()
778        }
779
780        async fn get_bucket_tagging(
781            &self,
782            _input: GetBucketTaggingRequest,
783        ) -> Result<GetBucketTaggingOutput, RusotoError<GetBucketTaggingError>> {
784            unimplemented!()
785        }
786
787        async fn get_bucket_versioning(
788            &self,
789            _input: GetBucketVersioningRequest,
790        ) -> Result<GetBucketVersioningOutput, RusotoError<GetBucketVersioningError>> {
791            unimplemented!()
792        }
793
794        async fn get_bucket_website(
795            &self,
796            _input: GetBucketWebsiteRequest,
797        ) -> Result<GetBucketWebsiteOutput, RusotoError<GetBucketWebsiteError>> {
798            unimplemented!()
799        }
800
801        async fn get_object(
802            &self,
803            _input: GetObjectRequest,
804        ) -> Result<GetObjectOutput, RusotoError<GetObjectError>> {
805            unimplemented!()
806        }
807
808        async fn get_object_acl(
809            &self,
810            _input: GetObjectAclRequest,
811        ) -> Result<GetObjectAclOutput, RusotoError<GetObjectAclError>> {
812            unimplemented!()
813        }
814
815        async fn get_object_legal_hold(
816            &self,
817            _input: GetObjectLegalHoldRequest,
818        ) -> Result<GetObjectLegalHoldOutput, RusotoError<GetObjectLegalHoldError>> {
819            unimplemented!()
820        }
821
822        async fn get_object_lock_configuration(
823            &self,
824            _input: GetObjectLockConfigurationRequest,
825        ) -> Result<GetObjectLockConfigurationOutput, RusotoError<GetObjectLockConfigurationError>>
826        {
827            unimplemented!()
828        }
829
830        async fn get_object_retention(
831            &self,
832            _input: GetObjectRetentionRequest,
833        ) -> Result<GetObjectRetentionOutput, RusotoError<GetObjectRetentionError>> {
834            unimplemented!()
835        }
836
837        async fn get_object_tagging(
838            &self,
839            _input: GetObjectTaggingRequest,
840        ) -> Result<GetObjectTaggingOutput, RusotoError<GetObjectTaggingError>> {
841            unimplemented!()
842        }
843
844        async fn get_object_torrent(
845            &self,
846            _input: GetObjectTorrentRequest,
847        ) -> Result<GetObjectTorrentOutput, RusotoError<GetObjectTorrentError>> {
848            unimplemented!()
849        }
850
851        async fn get_public_access_block(
852            &self,
853            _input: GetPublicAccessBlockRequest,
854        ) -> Result<GetPublicAccessBlockOutput, RusotoError<GetPublicAccessBlockError>> {
855            unimplemented!()
856        }
857
858        async fn head_bucket(
859            &self,
860            _input: HeadBucketRequest,
861        ) -> Result<(), RusotoError<HeadBucketError>> {
862            unimplemented!()
863        }
864
865        async fn head_object(
866            &self,
867            _input: HeadObjectRequest,
868        ) -> Result<HeadObjectOutput, RusotoError<HeadObjectError>> {
869            unimplemented!()
870        }
871
872        async fn list_bucket_analytics_configurations(
873            &self,
874            _input: ListBucketAnalyticsConfigurationsRequest,
875        ) -> Result<
876            ListBucketAnalyticsConfigurationsOutput,
877            RusotoError<ListBucketAnalyticsConfigurationsError>,
878        > {
879            unimplemented!()
880        }
881
882        async fn list_bucket_inventory_configurations(
883            &self,
884            _input: ListBucketInventoryConfigurationsRequest,
885        ) -> Result<
886            ListBucketInventoryConfigurationsOutput,
887            RusotoError<ListBucketInventoryConfigurationsError>,
888        > {
889            unimplemented!()
890        }
891
892        async fn list_bucket_metrics_configurations(
893            &self,
894            _input: ListBucketMetricsConfigurationsRequest,
895        ) -> Result<
896            ListBucketMetricsConfigurationsOutput,
897            RusotoError<ListBucketMetricsConfigurationsError>,
898        > {
899            unimplemented!()
900        }
901
902        async fn list_buckets(&self) -> Result<ListBucketsOutput, RusotoError<ListBucketsError>> {
903            unimplemented!()
904        }
905
906        async fn list_multipart_uploads(
907            &self,
908            _input: ListMultipartUploadsRequest,
909        ) -> Result<ListMultipartUploadsOutput, RusotoError<ListMultipartUploadsError>> {
910            unimplemented!()
911        }
912
913        async fn list_object_versions(
914            &self,
915            _input: ListObjectVersionsRequest,
916        ) -> Result<ListObjectVersionsOutput, RusotoError<ListObjectVersionsError>> {
917            unimplemented!()
918        }
919
920        async fn list_objects(
921            &self,
922            _input: ListObjectsRequest,
923        ) -> Result<ListObjectsOutput, RusotoError<ListObjectsError>> {
924            unimplemented!()
925        }
926
927        async fn list_objects_v2(
928            &self,
929            _input: ListObjectsV2Request,
930        ) -> Result<ListObjectsV2Output, RusotoError<ListObjectsV2Error>> {
931            unimplemented!()
932        }
933
934        async fn list_parts(
935            &self,
936            _input: ListPartsRequest,
937        ) -> Result<ListPartsOutput, RusotoError<ListPartsError>> {
938            unimplemented!()
939        }
940
941        async fn put_bucket_accelerate_configuration(
942            &self,
943            _input: PutBucketAccelerateConfigurationRequest,
944        ) -> Result<(), RusotoError<PutBucketAccelerateConfigurationError>> {
945            unimplemented!()
946        }
947
948        async fn put_bucket_acl(
949            &self,
950            _input: PutBucketAclRequest,
951        ) -> Result<(), RusotoError<PutBucketAclError>> {
952            unimplemented!()
953        }
954
955        async fn put_bucket_analytics_configuration(
956            &self,
957            _input: PutBucketAnalyticsConfigurationRequest,
958        ) -> Result<(), RusotoError<PutBucketAnalyticsConfigurationError>> {
959            unimplemented!()
960        }
961
962        async fn put_bucket_cors(
963            &self,
964            _input: PutBucketCorsRequest,
965        ) -> Result<(), RusotoError<PutBucketCorsError>> {
966            unimplemented!()
967        }
968
969        async fn put_bucket_encryption(
970            &self,
971            _input: PutBucketEncryptionRequest,
972        ) -> Result<(), RusotoError<PutBucketEncryptionError>> {
973            unimplemented!()
974        }
975
976        async fn put_bucket_inventory_configuration(
977            &self,
978            _input: PutBucketInventoryConfigurationRequest,
979        ) -> Result<(), RusotoError<PutBucketInventoryConfigurationError>> {
980            unimplemented!()
981        }
982
983        async fn put_bucket_lifecycle(
984            &self,
985            _input: PutBucketLifecycleRequest,
986        ) -> Result<(), RusotoError<PutBucketLifecycleError>> {
987            unimplemented!()
988        }
989
990        async fn put_bucket_lifecycle_configuration(
991            &self,
992            _input: PutBucketLifecycleConfigurationRequest,
993        ) -> Result<(), RusotoError<PutBucketLifecycleConfigurationError>> {
994            unimplemented!()
995        }
996
997        async fn put_bucket_logging(
998            &self,
999            _input: PutBucketLoggingRequest,
1000        ) -> Result<(), RusotoError<PutBucketLoggingError>> {
1001            unimplemented!()
1002        }
1003
1004        async fn put_bucket_metrics_configuration(
1005            &self,
1006            _input: PutBucketMetricsConfigurationRequest,
1007        ) -> Result<(), RusotoError<PutBucketMetricsConfigurationError>> {
1008            unimplemented!()
1009        }
1010
1011        async fn put_bucket_notification(
1012            &self,
1013            _input: PutBucketNotificationRequest,
1014        ) -> Result<(), RusotoError<PutBucketNotificationError>> {
1015            unimplemented!()
1016        }
1017
1018        async fn put_bucket_notification_configuration(
1019            &self,
1020            _input: PutBucketNotificationConfigurationRequest,
1021        ) -> Result<(), RusotoError<PutBucketNotificationConfigurationError>> {
1022            unimplemented!()
1023        }
1024
1025        async fn put_bucket_policy(
1026            &self,
1027            _input: PutBucketPolicyRequest,
1028        ) -> Result<(), RusotoError<PutBucketPolicyError>> {
1029            unimplemented!()
1030        }
1031
1032        async fn put_bucket_replication(
1033            &self,
1034            _input: PutBucketReplicationRequest,
1035        ) -> Result<(), RusotoError<PutBucketReplicationError>> {
1036            unimplemented!()
1037        }
1038
1039        async fn put_bucket_request_payment(
1040            &self,
1041            _input: PutBucketRequestPaymentRequest,
1042        ) -> Result<(), RusotoError<PutBucketRequestPaymentError>> {
1043            unimplemented!()
1044        }
1045
1046        async fn put_bucket_tagging(
1047            &self,
1048            _input: PutBucketTaggingRequest,
1049        ) -> Result<(), RusotoError<PutBucketTaggingError>> {
1050            unimplemented!()
1051        }
1052
1053        async fn put_bucket_versioning(
1054            &self,
1055            _input: PutBucketVersioningRequest,
1056        ) -> Result<(), RusotoError<PutBucketVersioningError>> {
1057            unimplemented!()
1058        }
1059
1060        async fn put_bucket_website(
1061            &self,
1062            _input: PutBucketWebsiteRequest,
1063        ) -> Result<(), RusotoError<PutBucketWebsiteError>> {
1064            unimplemented!()
1065        }
1066
1067        async fn put_object(
1068            &self,
1069            input: PutObjectRequest,
1070        ) -> Result<PutObjectOutput, RusotoError<PutObjectError>> {
1071            self.puts.lock().unwrap().borrow_mut().push(input);
1072            Ok(PutObjectOutput::default())
1073        }
1074
1075        async fn put_object_acl(
1076            &self,
1077            _input: PutObjectAclRequest,
1078        ) -> Result<PutObjectAclOutput, RusotoError<PutObjectAclError>> {
1079            unimplemented!()
1080        }
1081
1082        async fn put_object_legal_hold(
1083            &self,
1084            _input: PutObjectLegalHoldRequest,
1085        ) -> Result<PutObjectLegalHoldOutput, RusotoError<PutObjectLegalHoldError>> {
1086            unimplemented!()
1087        }
1088
1089        async fn put_object_lock_configuration(
1090            &self,
1091            _input: PutObjectLockConfigurationRequest,
1092        ) -> Result<PutObjectLockConfigurationOutput, RusotoError<PutObjectLockConfigurationError>>
1093        {
1094            unimplemented!()
1095        }
1096
1097        async fn put_object_retention(
1098            &self,
1099            _input: PutObjectRetentionRequest,
1100        ) -> Result<PutObjectRetentionOutput, RusotoError<PutObjectRetentionError>> {
1101            unimplemented!()
1102        }
1103
1104        async fn put_object_tagging(
1105            &self,
1106            _input: PutObjectTaggingRequest,
1107        ) -> Result<PutObjectTaggingOutput, RusotoError<PutObjectTaggingError>> {
1108            unimplemented!()
1109        }
1110
1111        async fn put_public_access_block(
1112            &self,
1113            _input: PutPublicAccessBlockRequest,
1114        ) -> Result<(), RusotoError<PutPublicAccessBlockError>> {
1115            unimplemented!()
1116        }
1117
1118        async fn restore_object(
1119            &self,
1120            _input: RestoreObjectRequest,
1121        ) -> Result<RestoreObjectOutput, RusotoError<RestoreObjectError>> {
1122            unimplemented!()
1123        }
1124
1125        async fn select_object_content(
1126            &self,
1127            _input: SelectObjectContentRequest,
1128        ) -> Result<SelectObjectContentOutput, RusotoError<SelectObjectContentError>> {
1129            unimplemented!()
1130        }
1131
1132        async fn upload_part(
1133            &self,
1134            _input: UploadPartRequest,
1135        ) -> Result<UploadPartOutput, RusotoError<UploadPartError>> {
1136            unimplemented!()
1137        }
1138
1139        async fn upload_part_copy(
1140            &self,
1141            _input: UploadPartCopyRequest,
1142        ) -> Result<UploadPartCopyOutput, RusotoError<UploadPartCopyError>> {
1143            unimplemented!()
1144        }
1145    }
1146
1147    #[test]
1148    fn it_works() {
1149        let rt = tokio::runtime::Runtime::new().unwrap();
1150        rt.enter(|| {
1151            let s3_client = Mock::default();
1152            let puts = s3_client.puts.clone();
1153            let mut client = ClientBuilder::default()
1154                .batch_duration(300)
1155                .bucket("mybucky")
1156                .s3_client(s3_client)
1157                .handle(rt.handle().clone())
1158                .build()
1159                .unwrap();
1160
1161            let data = ("12345678".to_owned().repeat(1024)).into_bytes();
1162            let len = data.len();
1163            let req = BatchPutObjectRequest {
1164                name: "test.txt".to_string(),
1165                body: data,
1166            };
1167            let put_ref = client.put_object(req).unwrap();
1168            assert_eq!(0, put_ref.offset_bytes);
1169            assert_eq!(len, put_ref.size_bytes);
1170
1171            let data = "data-driven beard".to_owned().into_bytes();
1172            let len = data.len();
1173            let req = BatchPutObjectRequest {
1174                name: "data.log".to_string(),
1175                body: data,
1176            };
1177            let put_ref = client.put_object(req).unwrap();
1178            assert_eq!(8704, put_ref.offset_bytes);
1179            assert_eq!(len, put_ref.size_bytes);
1180
1181            std::thread::sleep(std::time::Duration::from_millis(600));
1182
1183            let guard = puts.lock().unwrap();
1184            let mut result = guard.borrow_mut();
1185            let first = result.pop().unwrap();
1186            assert_eq!(first.bucket, "mybucky");
1187            assert!(!first.key.is_empty());
1188            let mut arch = tar::Archive::new(first.body.unwrap().into_blocking_read());
1189            let mut iter = arch.entries().unwrap();
1190
1191            let first = iter.next().unwrap().unwrap();
1192            assert_eq!(first.path().unwrap().to_str().unwrap(), "test.txt");
1193
1194            let first = iter.next().unwrap().unwrap();
1195            assert_eq!(first.path().unwrap().to_str().unwrap(), "data.log");
1196        });
1197    }
1198}