1use log::*;
2use rusoto_core::RusotoError;
3
4use serde_derive::Serialize;
5use std::io;
6use std::io::{Read, Seek};
7use std::sync::mpsc::TrySendError;
8
9struct Batch<W: io::Write + io::Seek> {
10 start_time_ms: u64,
11 output: tar::Builder<W>,
12}
13impl<W: io::Write + io::Seek> Batch<W> {
14 pub fn new(start_time_ms: u64, output: W) -> Batch<W> {
15 Batch {
16 start_time_ms,
17 output: tar::Builder::new(output),
18 }
19 }
20 pub fn put_object(&mut self, write: QueuedWrite) -> io::Result<()> {
21 let pos = self
22 .output
23 .get_mut()
24 .seek(io::SeekFrom::Current(0))
25 .unwrap();
26 assert_eq!(pos, write.put_ref.offset_bytes as u64);
27 let mut header = tar::Header::new_old();
28 let h = &mut header;
29 h.set_path(write.input.name).expect("set_path()");
30 h.set_mtime(write.write_time.as_secs());
31 h.set_size(write.input.body.len() as _);
32 h.set_mode(0b0110000000);
34 h.set_cksum();
36 self.output.append(&header, &write.input.body[..])?;
37 let pos = self
38 .output
39 .get_mut()
40 .seek(io::SeekFrom::Current(0))
41 .unwrap();
42 assert!(pos >= (write.put_ref.offset_bytes + write.put_ref.size_bytes) as u64);
43 Ok(())
44 }
45
46 pub fn into_inner(self) -> io::Result<W> {
47 self.output.into_inner()
48 }
49}
50
51#[derive(Debug)]
52pub enum BatchPutObjectError {
53 NameTooLong {
54 length: usize,
55 },
56 QueueFull,
59 WriterClosed,
62 UnsizedBody,
66}
67
68struct BatchProcessor<Client: rusoto_s3::S3> {
69 rx: std::sync::mpsc::Receiver<QueuedWrite>,
70 batch_duration_ms: u128,
71 client: Client,
72 bucket: String,
74 key_prefix: String,
75 rt: tokio::runtime::Handle,
76 storage_class: Option<String>,
77}
78impl<Client: rusoto_s3::S3> BatchProcessor<Client> {
79 pub fn new(
80 rx: std::sync::mpsc::Receiver<QueuedWrite>,
81 batch_duration_ms: u128,
82 client: Client,
83 bucket: String,
84 key_prefix: String,
85 rt: tokio::runtime::Handle,
86 storage_class: Option<String>,
87 ) -> Self {
88 BatchProcessor {
89 rx,
90 batch_duration_ms,
91 client,
92 bucket,
93 rt,
94 key_prefix,
95 storage_class,
96 }
97 }
98 pub fn process(self) {
99 self.rt.block_on(self.process_async())
100 }
101 async fn process_async(&self) {
102 let mut current_batch = None;
103 let timeout = std::time::Duration::from_millis(self.batch_duration_ms as _);
104 loop {
105 match self.rx.recv_timeout(timeout) {
106 Ok(req) => {
107 if current_batch.is_none() {
108 let tmp = match tempfile::tempfile() {
109 Ok(t) => t,
110 Err(e) => {
111 log::error!("tempfile() failed: {:?}. BatchProcessor exiting", e);
112 break;
113 }
114 };
115 let batch_start = self.batch_start_ms(req.write_time);
116 current_batch = Some(Batch::new(batch_start, tmp));
117 } else {
118 let current_start = current_batch.as_ref().unwrap().start_time_ms;
119 let next_start = req.put_ref.batch_id;
120 if current_start != next_start {
121 let last = current_batch.take().unwrap();
122 if let Err(e) = self.finalise(last).await {
123 error!("finalise() failed: {:?}", e);
124 }
125 let tmp = match tempfile::tempfile() {
126 Ok(t) => t,
127 Err(e) => {
128 error!("tempfile() failed: {:?}. BatchProcessor exiting", e);
129 break;
130 }
131 };
132 current_batch = Some(Batch::new(next_start, tmp));
133 }
134 };
135 let mut batch = current_batch.take().unwrap();
136 let (res, batch) = self
137 .rt
138 .spawn_blocking(move || (batch.put_object(req), batch))
139 .await
140 .unwrap();
141 if let Err(e) = res {
142 error!("failure to write batch-object: {:?}", e);
143 } else {
148 current_batch = Some(batch);
149 }
150 }
151 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
152 if current_batch.is_some() {
153 let now = std::time::SystemTime::now();
154 let utc = now
155 .duration_since(std::time::UNIX_EPOCH)
156 .expect("duration_since()");
157 let batch_start = self.batch_start_ms(utc);
158 if batch_start > current_batch.as_ref().unwrap().start_time_ms {
159 let last = current_batch.take().unwrap();
160 if let Err(e) = self.finalise(last).await {
161 error!("finalise() failed: {:?}", e);
162 }
163 }
164 }
165 }
166 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
167 }
168 }
169 }
170
171 async fn finalise(&self, batch: Batch<std::fs::File>) -> io::Result<()> {
172 let start_time_ms = batch.start_time_ms;
173 let mut file = batch.into_inner()?;
174 let mut attempt = 0;
175 const MAX_ATTEMPTS: u32 = 3;
176 loop {
177 attempt += 1;
178 file.seek(std::io::SeekFrom::Start(0))?;
179 let mut all = vec![];
180 file.read_to_end(&mut all)?; let req = rusoto_s3::PutObjectRequest {
182 body: Some(rusoto_s3::StreamingBody::from(all)),
183 bucket: self.bucket.clone(),
184 key: format!("{}{:}.tar", self.key_prefix, start_time_ms),
185 storage_class: self.storage_class.clone(),
186 ..Default::default()
187 };
188 match self.client.put_object(req).await {
189 Ok(_output) => return Ok(()),
190 Err(e) => {
191 if attempt <= MAX_ATTEMPTS {
192 if let RusotoError::HttpDispatch(err) = e {
193 error!("put_object() failed on attempt {}: {:?}", attempt, err);
194 }
195 } else {
196 return Err(io::Error::new(io::ErrorKind::Other, e));
197 }
198 }
199 }
200 tokio::time::delay_for(std::time::Duration::from_millis(100 * (2_u64.pow(attempt))))
202 .await;
203 }
204 }
205
206 fn batch_start_ms(&self, utc: std::time::Duration) -> u64 {
207 (utc.as_millis() - (utc.as_millis() % self.batch_duration_ms)) as u64
208 }
209}
210
211struct QueuedWrite {
212 put_ref: BatchObjectRef,
216 input: BatchPutObjectRequest,
217 write_time: std::time::Duration,
218}
219
220struct Coalesce {
221 last_batch_id: Option<u64>,
222 batch_duration_ms: u64,
223 next_write_offset_bytes: usize,
224 tx: std::sync::mpsc::SyncSender<QueuedWrite>,
225}
226impl Coalesce {
227 const TAR_BLOCK_SIZE: usize = 512;
228
229 pub fn new(batch_duration_ms: u64, tx: std::sync::mpsc::SyncSender<QueuedWrite>) -> Self {
230 Coalesce {
231 last_batch_id: None,
232 batch_duration_ms,
233 next_write_offset_bytes: 0,
234 tx,
235 }
236 }
237 pub fn put_object(
238 &mut self,
239 input: BatchPutObjectRequest,
240 ) -> Result<BatchObjectRef, BatchPutObjectError> {
241 let now = std::time::SystemTime::now();
242 let utc = now
243 .duration_since(std::time::UNIX_EPOCH)
244 .expect("duration_since()");
245 let this_batch_id = self.batch_start_ms(utc);
246
247 if self.last_batch_id.is_none() {
248 self.last_batch_id = Some(this_batch_id);
249 } else if *self.last_batch_id.as_ref().unwrap() < this_batch_id {
250 self.next_write_offset_bytes = 0;
251 self.last_batch_id = Some(this_batch_id);
252 }
253 let size = input.body.len();
254 let put_ref = BatchObjectRef {
255 batch_id: *self.last_batch_id.as_ref().unwrap(),
256 offset_bytes: self.next_write_offset_bytes,
257 size_bytes: size,
258 };
259 let tar_space_required = Self::TAR_BLOCK_SIZE
260 + size
261 + if size % Self::TAR_BLOCK_SIZE == 0 {
262 0
263 } else {
264 Self::TAR_BLOCK_SIZE - size % Self::TAR_BLOCK_SIZE
265 };
266 self.next_write_offset_bytes += tar_space_required;
267
268 let write = QueuedWrite {
269 put_ref: put_ref.clone(),
270 write_time: utc,
271 input,
272 };
273 match self.tx.try_send(write) {
274 Ok(_) => Ok(put_ref),
275 Err(e) => {
276 self.next_write_offset_bytes -= tar_space_required;
278 Err(match e {
279 TrySendError::Full(_) => BatchPutObjectError::QueueFull,
280 TrySendError::Disconnected(_) => BatchPutObjectError::WriterClosed,
281 })
282 }
283 }
284 }
285
286 fn batch_start_ms(&self, utc: std::time::Duration) -> u64 {
287 let utc = utc.as_millis() as u64;
288 utc - utc % self.batch_duration_ms
289 }
290}
291
292pub struct S3BatchPutClient {
293 inner: std::sync::Arc<std::sync::Mutex<Coalesce>>,
294}
295impl S3BatchPutClient {
296 pub fn put_object(
302 &mut self,
303 input: BatchPutObjectRequest,
304 ) -> Result<BatchObjectRef, BatchPutObjectError> {
305 self.inner.lock().unwrap().put_object(input)
306 }
307}
308
309pub struct BatchPutObjectRequest {
310 pub name: String,
311 pub body: Vec<u8>,
312}
313
314#[derive(Serialize, Debug, Clone)]
315pub struct BatchObjectRef {
316 pub batch_id: u64,
317 pub offset_bytes: usize,
318 pub size_bytes: usize,
319}
320
321pub struct ClientBuilder<Client: rusoto_s3::S3> {
322 batch_duration: Option<u64>,
323 bucket: Option<String>,
324 key_prefix: Option<String>,
325 client: Option<Client>,
326 rt: Option<tokio::runtime::Handle>,
327 storage_class: Option<String>,
328}
329impl<Client: rusoto_s3::S3> Default for ClientBuilder<Client> {
330 fn default() -> Self {
331 ClientBuilder {
332 batch_duration: None,
333 bucket: None,
334 client: None,
335 key_prefix: None,
336 rt: None,
337 storage_class: None,
338 }
339 }
340}
341impl<Client: rusoto_s3::S3 + Send + 'static> ClientBuilder<Client> {
342 pub fn batch_duration(mut self, batch_duration: u64) -> Self {
343 self.batch_duration = Some(batch_duration);
344 self
345 }
346
347 pub fn bucket<S: ToString>(mut self, name: S) -> Self {
348 self.bucket = Some(name.to_string());
349 self
350 }
351
352 pub fn key_prefix<S: ToString>(mut self, prefix: S) -> Self {
353 self.key_prefix = Some(prefix.to_string());
354 self
355 }
356
357 pub fn s3_client(mut self, s3_client: Client) -> Self {
358 self.client = Some(s3_client);
359 self
360 }
361
362 pub fn handle(mut self, rt: tokio::runtime::Handle) -> Self {
363 self.rt = Some(rt);
364 self
365 }
366
367 pub fn storage_class(mut self, storage_class: String) -> Self {
369 self.storage_class = Some(storage_class);
370 self
371 }
372
373 pub fn build(self) -> Result<S3BatchPutClient, BuildError> {
374 let (tx, rx) = std::sync::mpsc::sync_channel(30);
375 let proc = BatchProcessor::new(
376 rx,
377 self.batch_duration
378 .ok_or(BuildError::MissingBatchDuration)? as _,
379 self.client.ok_or(BuildError::MissingS3Client)?,
380 self.bucket.ok_or(BuildError::MissingBucket)?,
381 self.key_prefix.unwrap_or_else(|| "".to_string()),
382 self.rt.ok_or(BuildError::MissingRuntime)?,
383 self.storage_class,
384 );
385 std::thread::spawn(|| proc.process());
386 let client = S3BatchPutClient {
387 inner: std::sync::Arc::new(std::sync::Mutex::new(Coalesce::new(
388 self.batch_duration
389 .ok_or(BuildError::MissingBatchDuration)? as _,
390 tx,
391 ))),
392 };
393 Ok(client)
394 }
395}
396
397#[derive(Debug)]
398pub enum BuildError {
399 MissingBucket,
400 MissingBatchDuration,
401 MissingS3Client,
402 MissingRuntime,
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use async_trait::async_trait;
409 use rusoto_core::RusotoError;
410 use rusoto_s3::{
411 AbortMultipartUploadError, AbortMultipartUploadOutput, AbortMultipartUploadRequest,
412 CompleteMultipartUploadError, CompleteMultipartUploadOutput,
413 CompleteMultipartUploadRequest, CopyObjectError, CopyObjectOutput, CopyObjectRequest,
414 CreateBucketError, CreateBucketOutput, CreateBucketRequest, CreateMultipartUploadError,
415 CreateMultipartUploadOutput, CreateMultipartUploadRequest,
416 DeleteBucketAnalyticsConfigurationError, DeleteBucketAnalyticsConfigurationRequest,
417 DeleteBucketCorsError, DeleteBucketCorsRequest, DeleteBucketEncryptionError,
418 DeleteBucketEncryptionRequest, DeleteBucketError, DeleteBucketInventoryConfigurationError,
419 DeleteBucketInventoryConfigurationRequest, DeleteBucketLifecycleError,
420 DeleteBucketLifecycleRequest, DeleteBucketMetricsConfigurationError,
421 DeleteBucketMetricsConfigurationRequest, DeleteBucketPolicyError,
422 DeleteBucketPolicyRequest, DeleteBucketReplicationError, DeleteBucketReplicationRequest,
423 DeleteBucketRequest, DeleteBucketTaggingError, DeleteBucketTaggingRequest,
424 DeleteBucketWebsiteError, DeleteBucketWebsiteRequest, DeleteObjectError,
425 DeleteObjectOutput, DeleteObjectRequest, DeleteObjectTaggingError,
426 DeleteObjectTaggingOutput, DeleteObjectTaggingRequest, DeleteObjectsError,
427 DeleteObjectsOutput, DeleteObjectsRequest, DeletePublicAccessBlockError,
428 DeletePublicAccessBlockRequest, GetBucketAccelerateConfigurationError,
429 GetBucketAccelerateConfigurationOutput, GetBucketAccelerateConfigurationRequest,
430 GetBucketAclError, GetBucketAclOutput, GetBucketAclRequest,
431 GetBucketAnalyticsConfigurationError, GetBucketAnalyticsConfigurationOutput,
432 GetBucketAnalyticsConfigurationRequest, GetBucketCorsError, GetBucketCorsOutput,
433 GetBucketCorsRequest, GetBucketEncryptionError, GetBucketEncryptionOutput,
434 GetBucketEncryptionRequest, GetBucketInventoryConfigurationError,
435 GetBucketInventoryConfigurationOutput, GetBucketInventoryConfigurationRequest,
436 GetBucketLifecycleConfigurationError, GetBucketLifecycleConfigurationOutput,
437 GetBucketLifecycleConfigurationRequest, GetBucketLifecycleError, GetBucketLifecycleOutput,
438 GetBucketLifecycleRequest, GetBucketLocationError, GetBucketLocationOutput,
439 GetBucketLocationRequest, GetBucketLoggingError, GetBucketLoggingOutput,
440 GetBucketLoggingRequest, GetBucketMetricsConfigurationError,
441 GetBucketMetricsConfigurationOutput, GetBucketMetricsConfigurationRequest,
442 GetBucketNotificationConfigurationError, GetBucketNotificationConfigurationRequest,
443 GetBucketNotificationError, GetBucketPolicyError, GetBucketPolicyOutput,
444 GetBucketPolicyRequest, GetBucketPolicyStatusError, GetBucketPolicyStatusOutput,
445 GetBucketPolicyStatusRequest, GetBucketReplicationError, GetBucketReplicationOutput,
446 GetBucketReplicationRequest, GetBucketRequestPaymentError, GetBucketRequestPaymentOutput,
447 GetBucketRequestPaymentRequest, GetBucketTaggingError, GetBucketTaggingOutput,
448 GetBucketTaggingRequest, GetBucketVersioningError, GetBucketVersioningOutput,
449 GetBucketVersioningRequest, GetBucketWebsiteError, GetBucketWebsiteOutput,
450 GetBucketWebsiteRequest, GetObjectAclError, GetObjectAclOutput, GetObjectAclRequest,
451 GetObjectError, GetObjectLegalHoldError, GetObjectLegalHoldOutput,
452 GetObjectLegalHoldRequest, GetObjectLockConfigurationError,
453 GetObjectLockConfigurationOutput, GetObjectLockConfigurationRequest, GetObjectOutput,
454 GetObjectRequest, GetObjectRetentionError, GetObjectRetentionOutput,
455 GetObjectRetentionRequest, GetObjectTaggingError, GetObjectTaggingOutput,
456 GetObjectTaggingRequest, GetObjectTorrentError, GetObjectTorrentOutput,
457 GetObjectTorrentRequest, GetPublicAccessBlockError, GetPublicAccessBlockOutput,
458 GetPublicAccessBlockRequest, HeadBucketError, HeadBucketRequest, HeadObjectError,
459 HeadObjectOutput, HeadObjectRequest, ListBucketAnalyticsConfigurationsError,
460 ListBucketAnalyticsConfigurationsOutput, ListBucketAnalyticsConfigurationsRequest,
461 ListBucketInventoryConfigurationsError, ListBucketInventoryConfigurationsOutput,
462 ListBucketInventoryConfigurationsRequest, ListBucketMetricsConfigurationsError,
463 ListBucketMetricsConfigurationsOutput, ListBucketMetricsConfigurationsRequest,
464 ListBucketsError, ListBucketsOutput, ListMultipartUploadsError, ListMultipartUploadsOutput,
465 ListMultipartUploadsRequest, ListObjectVersionsError, ListObjectVersionsOutput,
466 ListObjectVersionsRequest, ListObjectsError, ListObjectsOutput, ListObjectsRequest,
467 ListObjectsV2Error, ListObjectsV2Output, ListObjectsV2Request, ListPartsError,
468 ListPartsOutput, ListPartsRequest, NotificationConfiguration,
469 NotificationConfigurationDeprecated, PutBucketAccelerateConfigurationError,
470 PutBucketAccelerateConfigurationRequest, PutBucketAclError, PutBucketAclRequest,
471 PutBucketAnalyticsConfigurationError, PutBucketAnalyticsConfigurationRequest,
472 PutBucketCorsError, PutBucketCorsRequest, PutBucketEncryptionError,
473 PutBucketEncryptionRequest, PutBucketInventoryConfigurationError,
474 PutBucketInventoryConfigurationRequest, PutBucketLifecycleConfigurationError,
475 PutBucketLifecycleConfigurationRequest, PutBucketLifecycleError, PutBucketLifecycleRequest,
476 PutBucketLoggingError, PutBucketLoggingRequest, PutBucketMetricsConfigurationError,
477 PutBucketMetricsConfigurationRequest, PutBucketNotificationConfigurationError,
478 PutBucketNotificationConfigurationRequest, PutBucketNotificationError,
479 PutBucketNotificationRequest, PutBucketPolicyError, PutBucketPolicyRequest,
480 PutBucketReplicationError, PutBucketReplicationRequest, PutBucketRequestPaymentError,
481 PutBucketRequestPaymentRequest, PutBucketTaggingError, PutBucketTaggingRequest,
482 PutBucketVersioningError, PutBucketVersioningRequest, PutBucketWebsiteError,
483 PutBucketWebsiteRequest, PutObjectAclError, PutObjectAclOutput, PutObjectAclRequest,
484 PutObjectError, PutObjectLegalHoldError, PutObjectLegalHoldOutput,
485 PutObjectLegalHoldRequest, PutObjectLockConfigurationError,
486 PutObjectLockConfigurationOutput, PutObjectLockConfigurationRequest, PutObjectOutput,
487 PutObjectRequest, PutObjectRetentionError, PutObjectRetentionOutput,
488 PutObjectRetentionRequest, PutObjectTaggingError, PutObjectTaggingOutput,
489 PutObjectTaggingRequest, PutPublicAccessBlockError, PutPublicAccessBlockRequest,
490 RestoreObjectError, RestoreObjectOutput, RestoreObjectRequest, SelectObjectContentError,
491 SelectObjectContentOutput, SelectObjectContentRequest, UploadPartCopyError,
492 UploadPartCopyOutput, UploadPartCopyRequest, UploadPartError, UploadPartOutput,
493 UploadPartRequest,
494 };
495
496 #[derive(Default)]
497 struct Mock {
498 puts: std::sync::Arc<std::sync::Mutex<std::cell::RefCell<Vec<PutObjectRequest>>>>,
499 }
500 #[async_trait]
501 impl rusoto_s3::S3 for Mock {
502 async fn abort_multipart_upload(
503 &self,
504 _input: AbortMultipartUploadRequest,
505 ) -> Result<AbortMultipartUploadOutput, RusotoError<AbortMultipartUploadError>> {
506 unimplemented!()
507 }
508
509 async fn complete_multipart_upload(
510 &self,
511 _input: CompleteMultipartUploadRequest,
512 ) -> Result<CompleteMultipartUploadOutput, RusotoError<CompleteMultipartUploadError>>
513 {
514 unimplemented!()
515 }
516
517 async fn copy_object(
518 &self,
519 _input: CopyObjectRequest,
520 ) -> Result<CopyObjectOutput, RusotoError<CopyObjectError>> {
521 unimplemented!()
522 }
523
524 async fn create_bucket(
525 &self,
526 _input: CreateBucketRequest,
527 ) -> Result<CreateBucketOutput, RusotoError<CreateBucketError>> {
528 unimplemented!()
529 }
530
531 async fn create_multipart_upload(
532 &self,
533 _input: CreateMultipartUploadRequest,
534 ) -> Result<CreateMultipartUploadOutput, RusotoError<CreateMultipartUploadError>> {
535 unimplemented!()
536 }
537
538 async fn delete_bucket(
539 &self,
540 _input: DeleteBucketRequest,
541 ) -> Result<(), RusotoError<DeleteBucketError>> {
542 unimplemented!()
543 }
544
545 async fn delete_bucket_analytics_configuration(
546 &self,
547 _input: DeleteBucketAnalyticsConfigurationRequest,
548 ) -> Result<(), RusotoError<DeleteBucketAnalyticsConfigurationError>> {
549 unimplemented!()
550 }
551
552 async fn delete_bucket_cors(
553 &self,
554 _input: DeleteBucketCorsRequest,
555 ) -> Result<(), RusotoError<DeleteBucketCorsError>> {
556 unimplemented!()
557 }
558
559 async fn delete_bucket_encryption(
560 &self,
561 _input: DeleteBucketEncryptionRequest,
562 ) -> Result<(), RusotoError<DeleteBucketEncryptionError>> {
563 unimplemented!()
564 }
565
566 async fn delete_bucket_inventory_configuration(
567 &self,
568 _input: DeleteBucketInventoryConfigurationRequest,
569 ) -> Result<(), RusotoError<DeleteBucketInventoryConfigurationError>> {
570 unimplemented!()
571 }
572
573 async fn delete_bucket_lifecycle(
574 &self,
575 _input: DeleteBucketLifecycleRequest,
576 ) -> Result<(), RusotoError<DeleteBucketLifecycleError>> {
577 unimplemented!()
578 }
579
580 async fn delete_bucket_metrics_configuration(
581 &self,
582 _input: DeleteBucketMetricsConfigurationRequest,
583 ) -> Result<(), RusotoError<DeleteBucketMetricsConfigurationError>> {
584 unimplemented!()
585 }
586
587 async fn delete_bucket_policy(
588 &self,
589 _input: DeleteBucketPolicyRequest,
590 ) -> Result<(), RusotoError<DeleteBucketPolicyError>> {
591 unimplemented!()
592 }
593
594 async fn delete_bucket_replication(
595 &self,
596 _input: DeleteBucketReplicationRequest,
597 ) -> Result<(), RusotoError<DeleteBucketReplicationError>> {
598 unimplemented!()
599 }
600
601 async fn delete_bucket_tagging(
602 &self,
603 _input: DeleteBucketTaggingRequest,
604 ) -> Result<(), RusotoError<DeleteBucketTaggingError>> {
605 unimplemented!()
606 }
607
608 async fn delete_bucket_website(
609 &self,
610 _input: DeleteBucketWebsiteRequest,
611 ) -> Result<(), RusotoError<DeleteBucketWebsiteError>> {
612 unimplemented!()
613 }
614
615 async fn delete_object(
616 &self,
617 _input: DeleteObjectRequest,
618 ) -> Result<DeleteObjectOutput, RusotoError<DeleteObjectError>> {
619 unimplemented!()
620 }
621
622 async fn delete_object_tagging(
623 &self,
624 _input: DeleteObjectTaggingRequest,
625 ) -> Result<DeleteObjectTaggingOutput, RusotoError<DeleteObjectTaggingError>> {
626 unimplemented!()
627 }
628
629 async fn delete_objects(
630 &self,
631 _input: DeleteObjectsRequest,
632 ) -> Result<DeleteObjectsOutput, RusotoError<DeleteObjectsError>> {
633 unimplemented!()
634 }
635
636 async fn delete_public_access_block(
637 &self,
638 _input: DeletePublicAccessBlockRequest,
639 ) -> Result<(), RusotoError<DeletePublicAccessBlockError>> {
640 unimplemented!()
641 }
642
643 async fn get_bucket_accelerate_configuration(
644 &self,
645 _input: GetBucketAccelerateConfigurationRequest,
646 ) -> Result<
647 GetBucketAccelerateConfigurationOutput,
648 RusotoError<GetBucketAccelerateConfigurationError>,
649 > {
650 unimplemented!()
651 }
652
653 async fn get_bucket_acl(
654 &self,
655 _input: GetBucketAclRequest,
656 ) -> Result<GetBucketAclOutput, RusotoError<GetBucketAclError>> {
657 unimplemented!()
658 }
659
660 async fn get_bucket_analytics_configuration(
661 &self,
662 _input: GetBucketAnalyticsConfigurationRequest,
663 ) -> Result<
664 GetBucketAnalyticsConfigurationOutput,
665 RusotoError<GetBucketAnalyticsConfigurationError>,
666 > {
667 unimplemented!()
668 }
669
670 async fn get_bucket_cors(
671 &self,
672 _input: GetBucketCorsRequest,
673 ) -> Result<GetBucketCorsOutput, RusotoError<GetBucketCorsError>> {
674 unimplemented!()
675 }
676
677 async fn get_bucket_encryption(
678 &self,
679 _input: GetBucketEncryptionRequest,
680 ) -> Result<GetBucketEncryptionOutput, RusotoError<GetBucketEncryptionError>> {
681 unimplemented!()
682 }
683
684 async fn get_bucket_inventory_configuration(
685 &self,
686 _input: GetBucketInventoryConfigurationRequest,
687 ) -> Result<
688 GetBucketInventoryConfigurationOutput,
689 RusotoError<GetBucketInventoryConfigurationError>,
690 > {
691 unimplemented!()
692 }
693
694 async fn get_bucket_lifecycle(
695 &self,
696 _input: GetBucketLifecycleRequest,
697 ) -> Result<GetBucketLifecycleOutput, RusotoError<GetBucketLifecycleError>> {
698 unimplemented!()
699 }
700
701 async fn get_bucket_lifecycle_configuration(
702 &self,
703 _input: GetBucketLifecycleConfigurationRequest,
704 ) -> Result<
705 GetBucketLifecycleConfigurationOutput,
706 RusotoError<GetBucketLifecycleConfigurationError>,
707 > {
708 unimplemented!()
709 }
710
711 async fn get_bucket_location(
712 &self,
713 _input: GetBucketLocationRequest,
714 ) -> Result<GetBucketLocationOutput, RusotoError<GetBucketLocationError>> {
715 unimplemented!()
716 }
717
718 async fn get_bucket_logging(
719 &self,
720 _input: GetBucketLoggingRequest,
721 ) -> Result<GetBucketLoggingOutput, RusotoError<GetBucketLoggingError>> {
722 unimplemented!()
723 }
724
725 async fn get_bucket_metrics_configuration(
726 &self,
727 _input: GetBucketMetricsConfigurationRequest,
728 ) -> Result<
729 GetBucketMetricsConfigurationOutput,
730 RusotoError<GetBucketMetricsConfigurationError>,
731 > {
732 unimplemented!()
733 }
734
735 async fn get_bucket_notification(
736 &self,
737 _input: GetBucketNotificationConfigurationRequest,
738 ) -> Result<NotificationConfigurationDeprecated, RusotoError<GetBucketNotificationError>>
739 {
740 unimplemented!()
741 }
742
743 async fn get_bucket_notification_configuration(
744 &self,
745 _input: GetBucketNotificationConfigurationRequest,
746 ) -> Result<NotificationConfiguration, RusotoError<GetBucketNotificationConfigurationError>>
747 {
748 unimplemented!()
749 }
750
751 async fn get_bucket_policy(
752 &self,
753 _input: GetBucketPolicyRequest,
754 ) -> Result<GetBucketPolicyOutput, RusotoError<GetBucketPolicyError>> {
755 unimplemented!()
756 }
757
758 async fn get_bucket_policy_status(
759 &self,
760 _input: GetBucketPolicyStatusRequest,
761 ) -> Result<GetBucketPolicyStatusOutput, RusotoError<GetBucketPolicyStatusError>> {
762 unimplemented!()
763 }
764
765 async fn get_bucket_replication(
766 &self,
767 _input: GetBucketReplicationRequest,
768 ) -> Result<GetBucketReplicationOutput, RusotoError<GetBucketReplicationError>> {
769 unimplemented!()
770 }
771
772 async fn get_bucket_request_payment(
773 &self,
774 _input: GetBucketRequestPaymentRequest,
775 ) -> Result<GetBucketRequestPaymentOutput, RusotoError<GetBucketRequestPaymentError>>
776 {
777 unimplemented!()
778 }
779
780 async fn get_bucket_tagging(
781 &self,
782 _input: GetBucketTaggingRequest,
783 ) -> Result<GetBucketTaggingOutput, RusotoError<GetBucketTaggingError>> {
784 unimplemented!()
785 }
786
787 async fn get_bucket_versioning(
788 &self,
789 _input: GetBucketVersioningRequest,
790 ) -> Result<GetBucketVersioningOutput, RusotoError<GetBucketVersioningError>> {
791 unimplemented!()
792 }
793
794 async fn get_bucket_website(
795 &self,
796 _input: GetBucketWebsiteRequest,
797 ) -> Result<GetBucketWebsiteOutput, RusotoError<GetBucketWebsiteError>> {
798 unimplemented!()
799 }
800
801 async fn get_object(
802 &self,
803 _input: GetObjectRequest,
804 ) -> Result<GetObjectOutput, RusotoError<GetObjectError>> {
805 unimplemented!()
806 }
807
808 async fn get_object_acl(
809 &self,
810 _input: GetObjectAclRequest,
811 ) -> Result<GetObjectAclOutput, RusotoError<GetObjectAclError>> {
812 unimplemented!()
813 }
814
815 async fn get_object_legal_hold(
816 &self,
817 _input: GetObjectLegalHoldRequest,
818 ) -> Result<GetObjectLegalHoldOutput, RusotoError<GetObjectLegalHoldError>> {
819 unimplemented!()
820 }
821
822 async fn get_object_lock_configuration(
823 &self,
824 _input: GetObjectLockConfigurationRequest,
825 ) -> Result<GetObjectLockConfigurationOutput, RusotoError<GetObjectLockConfigurationError>>
826 {
827 unimplemented!()
828 }
829
830 async fn get_object_retention(
831 &self,
832 _input: GetObjectRetentionRequest,
833 ) -> Result<GetObjectRetentionOutput, RusotoError<GetObjectRetentionError>> {
834 unimplemented!()
835 }
836
837 async fn get_object_tagging(
838 &self,
839 _input: GetObjectTaggingRequest,
840 ) -> Result<GetObjectTaggingOutput, RusotoError<GetObjectTaggingError>> {
841 unimplemented!()
842 }
843
844 async fn get_object_torrent(
845 &self,
846 _input: GetObjectTorrentRequest,
847 ) -> Result<GetObjectTorrentOutput, RusotoError<GetObjectTorrentError>> {
848 unimplemented!()
849 }
850
851 async fn get_public_access_block(
852 &self,
853 _input: GetPublicAccessBlockRequest,
854 ) -> Result<GetPublicAccessBlockOutput, RusotoError<GetPublicAccessBlockError>> {
855 unimplemented!()
856 }
857
858 async fn head_bucket(
859 &self,
860 _input: HeadBucketRequest,
861 ) -> Result<(), RusotoError<HeadBucketError>> {
862 unimplemented!()
863 }
864
865 async fn head_object(
866 &self,
867 _input: HeadObjectRequest,
868 ) -> Result<HeadObjectOutput, RusotoError<HeadObjectError>> {
869 unimplemented!()
870 }
871
872 async fn list_bucket_analytics_configurations(
873 &self,
874 _input: ListBucketAnalyticsConfigurationsRequest,
875 ) -> Result<
876 ListBucketAnalyticsConfigurationsOutput,
877 RusotoError<ListBucketAnalyticsConfigurationsError>,
878 > {
879 unimplemented!()
880 }
881
882 async fn list_bucket_inventory_configurations(
883 &self,
884 _input: ListBucketInventoryConfigurationsRequest,
885 ) -> Result<
886 ListBucketInventoryConfigurationsOutput,
887 RusotoError<ListBucketInventoryConfigurationsError>,
888 > {
889 unimplemented!()
890 }
891
892 async fn list_bucket_metrics_configurations(
893 &self,
894 _input: ListBucketMetricsConfigurationsRequest,
895 ) -> Result<
896 ListBucketMetricsConfigurationsOutput,
897 RusotoError<ListBucketMetricsConfigurationsError>,
898 > {
899 unimplemented!()
900 }
901
902 async fn list_buckets(&self) -> Result<ListBucketsOutput, RusotoError<ListBucketsError>> {
903 unimplemented!()
904 }
905
906 async fn list_multipart_uploads(
907 &self,
908 _input: ListMultipartUploadsRequest,
909 ) -> Result<ListMultipartUploadsOutput, RusotoError<ListMultipartUploadsError>> {
910 unimplemented!()
911 }
912
913 async fn list_object_versions(
914 &self,
915 _input: ListObjectVersionsRequest,
916 ) -> Result<ListObjectVersionsOutput, RusotoError<ListObjectVersionsError>> {
917 unimplemented!()
918 }
919
920 async fn list_objects(
921 &self,
922 _input: ListObjectsRequest,
923 ) -> Result<ListObjectsOutput, RusotoError<ListObjectsError>> {
924 unimplemented!()
925 }
926
927 async fn list_objects_v2(
928 &self,
929 _input: ListObjectsV2Request,
930 ) -> Result<ListObjectsV2Output, RusotoError<ListObjectsV2Error>> {
931 unimplemented!()
932 }
933
934 async fn list_parts(
935 &self,
936 _input: ListPartsRequest,
937 ) -> Result<ListPartsOutput, RusotoError<ListPartsError>> {
938 unimplemented!()
939 }
940
941 async fn put_bucket_accelerate_configuration(
942 &self,
943 _input: PutBucketAccelerateConfigurationRequest,
944 ) -> Result<(), RusotoError<PutBucketAccelerateConfigurationError>> {
945 unimplemented!()
946 }
947
948 async fn put_bucket_acl(
949 &self,
950 _input: PutBucketAclRequest,
951 ) -> Result<(), RusotoError<PutBucketAclError>> {
952 unimplemented!()
953 }
954
955 async fn put_bucket_analytics_configuration(
956 &self,
957 _input: PutBucketAnalyticsConfigurationRequest,
958 ) -> Result<(), RusotoError<PutBucketAnalyticsConfigurationError>> {
959 unimplemented!()
960 }
961
962 async fn put_bucket_cors(
963 &self,
964 _input: PutBucketCorsRequest,
965 ) -> Result<(), RusotoError<PutBucketCorsError>> {
966 unimplemented!()
967 }
968
969 async fn put_bucket_encryption(
970 &self,
971 _input: PutBucketEncryptionRequest,
972 ) -> Result<(), RusotoError<PutBucketEncryptionError>> {
973 unimplemented!()
974 }
975
976 async fn put_bucket_inventory_configuration(
977 &self,
978 _input: PutBucketInventoryConfigurationRequest,
979 ) -> Result<(), RusotoError<PutBucketInventoryConfigurationError>> {
980 unimplemented!()
981 }
982
983 async fn put_bucket_lifecycle(
984 &self,
985 _input: PutBucketLifecycleRequest,
986 ) -> Result<(), RusotoError<PutBucketLifecycleError>> {
987 unimplemented!()
988 }
989
990 async fn put_bucket_lifecycle_configuration(
991 &self,
992 _input: PutBucketLifecycleConfigurationRequest,
993 ) -> Result<(), RusotoError<PutBucketLifecycleConfigurationError>> {
994 unimplemented!()
995 }
996
997 async fn put_bucket_logging(
998 &self,
999 _input: PutBucketLoggingRequest,
1000 ) -> Result<(), RusotoError<PutBucketLoggingError>> {
1001 unimplemented!()
1002 }
1003
1004 async fn put_bucket_metrics_configuration(
1005 &self,
1006 _input: PutBucketMetricsConfigurationRequest,
1007 ) -> Result<(), RusotoError<PutBucketMetricsConfigurationError>> {
1008 unimplemented!()
1009 }
1010
1011 async fn put_bucket_notification(
1012 &self,
1013 _input: PutBucketNotificationRequest,
1014 ) -> Result<(), RusotoError<PutBucketNotificationError>> {
1015 unimplemented!()
1016 }
1017
1018 async fn put_bucket_notification_configuration(
1019 &self,
1020 _input: PutBucketNotificationConfigurationRequest,
1021 ) -> Result<(), RusotoError<PutBucketNotificationConfigurationError>> {
1022 unimplemented!()
1023 }
1024
1025 async fn put_bucket_policy(
1026 &self,
1027 _input: PutBucketPolicyRequest,
1028 ) -> Result<(), RusotoError<PutBucketPolicyError>> {
1029 unimplemented!()
1030 }
1031
1032 async fn put_bucket_replication(
1033 &self,
1034 _input: PutBucketReplicationRequest,
1035 ) -> Result<(), RusotoError<PutBucketReplicationError>> {
1036 unimplemented!()
1037 }
1038
1039 async fn put_bucket_request_payment(
1040 &self,
1041 _input: PutBucketRequestPaymentRequest,
1042 ) -> Result<(), RusotoError<PutBucketRequestPaymentError>> {
1043 unimplemented!()
1044 }
1045
1046 async fn put_bucket_tagging(
1047 &self,
1048 _input: PutBucketTaggingRequest,
1049 ) -> Result<(), RusotoError<PutBucketTaggingError>> {
1050 unimplemented!()
1051 }
1052
1053 async fn put_bucket_versioning(
1054 &self,
1055 _input: PutBucketVersioningRequest,
1056 ) -> Result<(), RusotoError<PutBucketVersioningError>> {
1057 unimplemented!()
1058 }
1059
1060 async fn put_bucket_website(
1061 &self,
1062 _input: PutBucketWebsiteRequest,
1063 ) -> Result<(), RusotoError<PutBucketWebsiteError>> {
1064 unimplemented!()
1065 }
1066
1067 async fn put_object(
1068 &self,
1069 input: PutObjectRequest,
1070 ) -> Result<PutObjectOutput, RusotoError<PutObjectError>> {
1071 self.puts.lock().unwrap().borrow_mut().push(input);
1072 Ok(PutObjectOutput::default())
1073 }
1074
1075 async fn put_object_acl(
1076 &self,
1077 _input: PutObjectAclRequest,
1078 ) -> Result<PutObjectAclOutput, RusotoError<PutObjectAclError>> {
1079 unimplemented!()
1080 }
1081
1082 async fn put_object_legal_hold(
1083 &self,
1084 _input: PutObjectLegalHoldRequest,
1085 ) -> Result<PutObjectLegalHoldOutput, RusotoError<PutObjectLegalHoldError>> {
1086 unimplemented!()
1087 }
1088
1089 async fn put_object_lock_configuration(
1090 &self,
1091 _input: PutObjectLockConfigurationRequest,
1092 ) -> Result<PutObjectLockConfigurationOutput, RusotoError<PutObjectLockConfigurationError>>
1093 {
1094 unimplemented!()
1095 }
1096
1097 async fn put_object_retention(
1098 &self,
1099 _input: PutObjectRetentionRequest,
1100 ) -> Result<PutObjectRetentionOutput, RusotoError<PutObjectRetentionError>> {
1101 unimplemented!()
1102 }
1103
1104 async fn put_object_tagging(
1105 &self,
1106 _input: PutObjectTaggingRequest,
1107 ) -> Result<PutObjectTaggingOutput, RusotoError<PutObjectTaggingError>> {
1108 unimplemented!()
1109 }
1110
1111 async fn put_public_access_block(
1112 &self,
1113 _input: PutPublicAccessBlockRequest,
1114 ) -> Result<(), RusotoError<PutPublicAccessBlockError>> {
1115 unimplemented!()
1116 }
1117
1118 async fn restore_object(
1119 &self,
1120 _input: RestoreObjectRequest,
1121 ) -> Result<RestoreObjectOutput, RusotoError<RestoreObjectError>> {
1122 unimplemented!()
1123 }
1124
1125 async fn select_object_content(
1126 &self,
1127 _input: SelectObjectContentRequest,
1128 ) -> Result<SelectObjectContentOutput, RusotoError<SelectObjectContentError>> {
1129 unimplemented!()
1130 }
1131
1132 async fn upload_part(
1133 &self,
1134 _input: UploadPartRequest,
1135 ) -> Result<UploadPartOutput, RusotoError<UploadPartError>> {
1136 unimplemented!()
1137 }
1138
1139 async fn upload_part_copy(
1140 &self,
1141 _input: UploadPartCopyRequest,
1142 ) -> Result<UploadPartCopyOutput, RusotoError<UploadPartCopyError>> {
1143 unimplemented!()
1144 }
1145 }
1146
1147 #[test]
1148 fn it_works() {
1149 let rt = tokio::runtime::Runtime::new().unwrap();
1150 rt.enter(|| {
1151 let s3_client = Mock::default();
1152 let puts = s3_client.puts.clone();
1153 let mut client = ClientBuilder::default()
1154 .batch_duration(300)
1155 .bucket("mybucky")
1156 .s3_client(s3_client)
1157 .handle(rt.handle().clone())
1158 .build()
1159 .unwrap();
1160
1161 let data = ("12345678".to_owned().repeat(1024)).into_bytes();
1162 let len = data.len();
1163 let req = BatchPutObjectRequest {
1164 name: "test.txt".to_string(),
1165 body: data,
1166 };
1167 let put_ref = client.put_object(req).unwrap();
1168 assert_eq!(0, put_ref.offset_bytes);
1169 assert_eq!(len, put_ref.size_bytes);
1170
1171 let data = "data-driven beard".to_owned().into_bytes();
1172 let len = data.len();
1173 let req = BatchPutObjectRequest {
1174 name: "data.log".to_string(),
1175 body: data,
1176 };
1177 let put_ref = client.put_object(req).unwrap();
1178 assert_eq!(8704, put_ref.offset_bytes);
1179 assert_eq!(len, put_ref.size_bytes);
1180
1181 std::thread::sleep(std::time::Duration::from_millis(600));
1182
1183 let guard = puts.lock().unwrap();
1184 let mut result = guard.borrow_mut();
1185 let first = result.pop().unwrap();
1186 assert_eq!(first.bucket, "mybucky");
1187 assert!(!first.key.is_empty());
1188 let mut arch = tar::Archive::new(first.body.unwrap().into_blocking_read());
1189 let mut iter = arch.entries().unwrap();
1190
1191 let first = iter.next().unwrap().unwrap();
1192 assert_eq!(first.path().unwrap().to_str().unwrap(), "test.txt");
1193
1194 let first = iter.next().unwrap().unwrap();
1195 assert_eq!(first.path().unwrap().to_str().unwrap(), "data.log");
1196 });
1197 }
1198}