qiniu_objects_manager/
batch_operations.rs

1use super::{callbacks::Callbacks, list::make_callback_error, Bucket, OperationProvider};
2use anyhow::{Error as AnyError, Result as AnyResult};
3use assert_impl::assert_impl;
4use auto_impl::auto_impl;
5use dyn_clonable::clonable;
6use qiniu_apis::{
7    http::{ResponseErrorKind as HttpResponseErrorKind, ResponseParts, StatusCode},
8    http_client::{
9        ApiResult, RegionsProvider, RegionsProviderEndpoints, RequestBuilderParts, Response, ResponseError,
10        ResponseErrorKind,
11    },
12    storage::batch_ops::{
13        OperationResponse, OperationResponseData, RequestBody, ResponseBody,
14        SyncRequestBuilder as BatchOpsSyncRequestBuilder,
15    },
16};
17use std::{
18    collections::VecDeque,
19    error::Error as StdError,
20    fmt::{self, Debug, Display},
21    mem::take,
22};
23
24/// 最大批量操作数获取接口
25#[clonable]
26#[auto_impl(&, &mut, Box, Rc, Arc)]
27pub trait BatchSizeProvider: Clone + Debug + Send + Sync {
28    /// 获取最大批量操作数
29    fn batch_size(&self) -> usize;
30}
31
32impl BatchSizeProvider for usize {
33    #[inline]
34    fn batch_size(&self) -> usize {
35        *self
36    }
37}
38
39/// 批量操作
40pub struct BatchOperations<'a> {
41    bucket: &'a Bucket,
42    operations: Option<Box<dyn Iterator<Item = Box<dyn OperationProvider + 'a>> + Send + Sync + 'a>>,
43    batch_size: Option<Box<dyn BatchSizeProvider + 'a>>,
44    callbacks: Callbacks<'a>,
45}
46
47impl<'a> BatchOperations<'a> {
48    pub(super) fn new(bucket: &'a Bucket) -> Self {
49        Self {
50            bucket,
51            operations: Default::default(),
52            batch_size: Default::default(),
53            callbacks: Default::default(),
54        }
55    }
56
57    /// 设置最大批量操作数提供者
58    #[inline]
59    pub fn batch_size(&mut self, batch_size: impl BatchSizeProvider + 'a) -> &mut Self {
60        self.batch_size = Some(Box::new(batch_size));
61        self
62    }
63
64    /// 添加对象操作提供者
65    #[inline]
66    pub fn add_operation(&mut self, operation: impl OperationProvider + 'a) -> &mut Self {
67        let new_iter = vec![Box::new(operation) as Box<dyn OperationProvider + 'a>].into_iter();
68        self.add_operations(new_iter)
69    }
70
71    /// 批量添加操作提供者
72    #[inline]
73    pub fn add_operations<I: IntoIterator<Item = Box<dyn OperationProvider + 'a>> + Send + Sync + 'a>(
74        &mut self,
75        new_iter: I,
76    ) -> &mut Self
77    where
78        <I as IntoIterator>::IntoIter: Sync + Send,
79    {
80        if let Some(iter) = take(&mut self.operations) {
81            self.operations = Some(Box::new(iter.chain(new_iter)));
82        } else {
83            self.operations = Some(Box::new(new_iter.into_iter()));
84        }
85        self
86    }
87
88    /// 设置请求前回调函数
89    #[inline]
90    pub fn before_request_callback(
91        &mut self,
92        callback: impl FnMut(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'a,
93    ) -> &mut Self {
94        self.callbacks.insert_before_request_callback(callback);
95        self
96    }
97
98    /// 设置响应成功回调函数
99    #[inline]
100    pub fn after_response_ok_callback(
101        &mut self,
102        callback: impl FnMut(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'a,
103    ) -> &mut Self {
104        self.callbacks.insert_after_response_ok_callback(callback);
105        self
106    }
107
108    /// 设置响应失败回调函数
109    #[inline]
110    pub fn after_response_error_callback(
111        &mut self,
112        callback: impl FnMut(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'a,
113    ) -> &mut Self {
114        self.callbacks.insert_after_response_error_callback(callback);
115        self
116    }
117
118    /// 阻塞发起批量操作,返回操作结果迭代器
119    ///
120    /// 该方法的的异步版本为 [`Self::async_call`]。
121    #[inline]
122    pub fn call(&mut self) -> BatchOperationsIterator<'a> {
123        BatchOperationsIterator {
124            operations: self.take_self(),
125            buffer: Default::default(),
126            closed: false,
127        }
128    }
129
130    /// 异步发起批量操作,返回操作结果流
131    #[inline]
132    #[cfg(feature = "async")]
133    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
134    pub fn async_call(&mut self) -> BatchOperationsStream<'a> {
135        BatchOperationsStream::new(self.take_self())
136    }
137
138    fn take_self(&mut self) -> Self {
139        Self {
140            bucket: self.bucket,
141            operations: take(&mut self.operations),
142            batch_size: take(&mut self.batch_size),
143            callbacks: take(&mut self.callbacks),
144        }
145    }
146
147    #[allow(dead_code)]
148    fn assert() {
149        assert_impl!(Send: Self);
150        assert_impl!(Sync: Self);
151    }
152}
153
154impl Debug for BatchOperations<'_> {
155    #[inline]
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("BatchOperations")
158            .field("bucket", &self.bucket)
159            .field("batch_size", &self.batch_size)
160            .finish()
161    }
162}
163
164/// 批量操作迭代器
165///
166/// 实现 [`std::iter::Iterator`] 接口,
167/// 在迭代过程中阻塞发起批量操作 API
168#[derive(Debug)]
169pub struct BatchOperationsIterator<'a> {
170    operations: BatchOperations<'a>,
171    buffer: VecDeque<ApiResult<OperationResponseData>>,
172    closed: bool,
173}
174
175impl Iterator for BatchOperationsIterator<'_> {
176    type Item = ApiResult<OperationResponseData>;
177
178    #[inline]
179    fn next(&mut self) -> Option<Self::Item> {
180        if let Some(first) = self.buffer.pop_front() {
181            Some(first)
182        } else if self.closed {
183            None
184        } else {
185            self.next_response().map(|v| v.map(Ok)).unwrap_or_else(|e| Some(Err(e)))
186        }
187    }
188}
189
190const DEFAULT_BATCH_SIZE: usize = 1000;
191type RefRegionProviderEndpoints<'a> = RegionsProviderEndpoints<&'a dyn RegionsProvider>;
192
193impl<'a> BatchOperationsIterator<'a> {
194    fn next_response(&mut self) -> ApiResult<Option<OperationResponseData>> {
195        if let Some(request_body) = self.generate_request_body() {
196            let request = self.make_request()?;
197            let response = self.call_request(request, request_body)?;
198            self.handle_response(response.into_body()).transpose()
199        } else {
200            Ok(None)
201        }
202    }
203
204    fn make_request(&self) -> ApiResult<BatchOpsSyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
205        let request = self
206            .operations
207            .bucket
208            .objects_manager()
209            .client()
210            .storage()
211            .batch_ops()
212            .new_request(
213                RegionsProviderEndpoints::new(self.operations.bucket.region_provider()?),
214                self.operations.bucket.objects_manager().credential(),
215            );
216        Ok(request)
217    }
218
219    fn call_request(
220        &mut self,
221        mut request: BatchOpsSyncRequestBuilder<'_, RefRegionProviderEndpoints>,
222        request_body: RequestBody,
223    ) -> ApiResult<Response<ResponseBody>> {
224        self.operations
225            .callbacks
226            .before_request(request.parts_mut())
227            .map_err(make_callback_error)?;
228        let mut response_result = request.call(request_body);
229        self.operations
230            .callbacks
231            .after_response(&mut response_result)
232            .map_err(make_callback_error)?;
233        response_result
234    }
235
236    fn handle_response(&mut self, response_body: ResponseBody) -> Option<ApiResult<OperationResponseData>> {
237        let responses = response_body.to_operation_response_vec();
238        self.buffer
239            .extend(responses.into_iter().map(from_response_to_response_data_result));
240        self.buffer.pop_front()
241    }
242
243    fn generate_request_body(&mut self) -> Option<RequestBody> {
244        let mut request_body = RequestBody::default();
245        let mut operation_count = 0usize;
246        for _ in 0..self
247            .operations
248            .batch_size
249            .as_ref()
250            .map(|provider| provider.batch_size())
251            .unwrap_or(DEFAULT_BATCH_SIZE)
252        {
253            if let Some(mut operation) = self.operations.operations.as_mut().and_then(|op| op.next()) {
254                request_body = request_body.append_operations_as_str(operation.to_operation());
255                operation_count += 1;
256            } else {
257                self.closed = true;
258                break;
259            }
260        }
261        if operation_count > 0 {
262            Some(request_body)
263        } else {
264            None
265        }
266    }
267
268    #[allow(dead_code)]
269    fn assert() {
270        assert_impl!(Send: Self);
271        assert_impl!(Sync: Self);
272    }
273}
274
275#[cfg(feature = "async")]
276mod async_stream {
277    use super::*;
278    use futures::{future::BoxFuture, ready, FutureExt, Stream};
279    use qiniu_apis::storage::batch_ops::AsyncRequestBuilder as BatchOpsAsyncRequestBuilder;
280    use std::{
281        fmt::{self, Debug},
282        io::Result as IOResult,
283        pin::Pin,
284        task::{Context, Poll},
285    };
286
287    /// 批量操作流
288    ///
289    /// 实现 [`futures::stream::Stream`] 接口,
290    /// 在迭代过程中异步发起批量操作 API
291    #[must_use]
292    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
293    #[derive(Debug)]
294    pub struct BatchOperationsStream<'a> {
295        operations: BatchOperations<'a>,
296        current_step: BatchOperationsStep<'a>,
297        closed: bool,
298    }
299
300    enum BatchOperationsStep<'a> {
301        FromBuffer {
302            buffer: VecDeque<ApiResult<OperationResponseData>>,
303        },
304        WaitForResponse {
305            task: BoxFuture<'a, ApiResult<Response<ResponseBody>>>,
306        },
307        WaitForRegionProvider {
308            task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
309        },
310        Done,
311    }
312
313    impl Default for BatchOperationsStep<'_> {
314        #[inline]
315        fn default() -> Self {
316            Self::FromBuffer { buffer: Default::default() }
317        }
318    }
319
320    impl Debug for BatchOperationsStep<'_> {
321        #[inline]
322        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323            match self {
324                Self::FromBuffer { buffer } => f.debug_tuple("FromBuffer").field(buffer).finish(),
325                Self::WaitForResponse { .. } => f.debug_tuple("WaitForResponse").finish(),
326                Self::WaitForRegionProvider { .. } => f.debug_tuple("WaitForRegionProvider").finish(),
327                Self::Done => f.debug_tuple("Done").finish(),
328            }
329        }
330    }
331
332    impl Stream for BatchOperationsStream<'_> {
333        type Item = ApiResult<OperationResponseData>;
334
335        #[inline]
336        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
337            match self.current_step {
338                BatchOperationsStep::FromBuffer { .. } => self.read_from_buffer(cx),
339                BatchOperationsStep::WaitForResponse { .. } => self.wait_for_response(cx),
340                BatchOperationsStep::WaitForRegionProvider { .. } => self.wait_for_region(cx),
341                BatchOperationsStep::Done { .. } => Poll::Ready(None),
342            }
343        }
344    }
345
346    impl<'a> BatchOperationsStream<'a> {
347        pub(super) fn new(operations: BatchOperations<'a>) -> Self {
348            Self {
349                operations,
350                current_step: Default::default(),
351                closed: false,
352            }
353        }
354
355        fn read_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
356            if let BatchOperationsStep::FromBuffer { buffer } = &mut self.current_step {
357                if let Some(response) = buffer.pop_front() {
358                    Poll::Ready(Some(response))
359                } else if self.closed {
360                    self.current_step = BatchOperationsStep::Done;
361                    self.poll_next(cx)
362                } else {
363                    let bucket = self.operations.bucket;
364                    self.current_step = BatchOperationsStep::WaitForRegionProvider {
365                        task: Box::pin(async move { bucket.async_region_provider().await }),
366                    };
367                    self.poll_next(cx)
368                }
369            } else {
370                unreachable!()
371            }
372        }
373
374        fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
375            if let BatchOperationsStep::WaitForResponse { task } = &mut self.current_step {
376                let mut response_result = ready!(task.poll_unpin(cx));
377                if let Err(err) = self.operations.callbacks.after_response(&mut response_result) {
378                    self.current_step = BatchOperationsStep::Done;
379                    return Poll::Ready(Some(Err(make_callback_error(err))));
380                }
381                match response_result {
382                    Ok(response) => {
383                        self.current_step = BatchOperationsStep::FromBuffer {
384                            buffer: response
385                                .into_body()
386                                .to_operation_response_vec()
387                                .into_iter()
388                                .map(from_response_to_response_data_result)
389                                .collect(),
390                        };
391                        self.poll_next(cx)
392                    }
393                    Err(err) => {
394                        self.current_step = BatchOperationsStep::Done;
395                        Poll::Ready(Some(Err(err)))
396                    }
397                }
398            } else {
399                unreachable!()
400            }
401        }
402
403        fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
404            if let BatchOperationsStep::WaitForRegionProvider { task } = &mut self.current_step {
405                match ready!(task.poll_unpin(cx)) {
406                    Ok(region_provider) => {
407                        if let Some(request_body) = self.generate_request_body() {
408                            let mut request = self.make_request(region_provider);
409                            if let Err(err) = self.operations.callbacks.before_request(request.parts_mut()) {
410                                self.current_step = BatchOperationsStep::Done;
411                                Poll::Ready(Some(Err(make_callback_error(err))))
412                            } else {
413                                self.current_step = BatchOperationsStep::WaitForResponse {
414                                    task: Box::pin(async move { request.call(request_body).await }),
415                                };
416                                self.poll_next(cx)
417                            }
418                        } else {
419                            self.current_step = BatchOperationsStep::Done;
420                            self.poll_next(cx)
421                        }
422                    }
423                    Err(err) => {
424                        self.current_step = BatchOperationsStep::Done;
425                        Poll::Ready(Some(Err(err.into())))
426                    }
427                }
428            } else {
429                unreachable!()
430            }
431        }
432
433        fn generate_request_body(&mut self) -> Option<RequestBody> {
434            let mut request_body = RequestBody::default();
435            let mut operation_count = 0usize;
436            for _ in 0..self
437                .operations
438                .batch_size
439                .as_ref()
440                .map(|provider| provider.batch_size())
441                .unwrap_or(DEFAULT_BATCH_SIZE)
442            {
443                if let Some(mut operation) = self.operations.operations.as_mut().and_then(|op| op.next()) {
444                    request_body = request_body.append_operations_as_str(operation.to_operation());
445                    operation_count += 1;
446                } else {
447                    self.closed = true;
448                    break;
449                }
450            }
451            if operation_count > 0 {
452                Some(request_body)
453            } else {
454                None
455            }
456        }
457
458        fn make_request(
459            &self,
460            region_provider: &'a dyn RegionsProvider,
461        ) -> BatchOpsAsyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>> {
462            self.operations
463                .bucket
464                .objects_manager()
465                .client()
466                .storage()
467                .batch_ops()
468                .new_async_request(
469                    RegionsProviderEndpoints::new(region_provider),
470                    self.operations.bucket.objects_manager().credential(),
471                )
472        }
473
474        #[allow(dead_code)]
475        fn assert() {
476            assert_impl!(Send: Self);
477            // assert_impl!(Sync: Self);
478        }
479    }
480}
481
482#[cfg(feature = "async")]
483pub use async_stream::*;
484
485fn from_response_to_response_data_result(response: OperationResponse) -> ApiResult<OperationResponseData> {
486    let status_code = StatusCode::from_u16(
487        response
488            .get_code_as_u64()
489            .try_into()
490            .map_err(make_invalid_request_response_error)?,
491    )
492    .map_err(make_invalid_request_response_error)?;
493
494    return if status_code == StatusCode::OK {
495        Ok(response.get_data().unwrap_or_default())
496    } else {
497        Err(ResponseError::new(
498            ResponseErrorKind::StatusCodeError(status_code),
499            response
500                .get_data()
501                .and_then(|data| data.get_error_as_str().map(|err| AnyError::msg(err.to_owned())))
502                .unwrap_or_else(|| NoErrorMessageFromOperation.into()),
503        ))
504    };
505
506    fn make_invalid_request_response_error(err: impl Into<AnyError>) -> ResponseError {
507        ResponseError::new(HttpResponseErrorKind::InvalidRequestResponse.into(), err)
508    }
509
510    #[derive(Clone, Debug)]
511    struct NoErrorMessageFromOperation;
512
513    impl Display for NoErrorMessageFromOperation {
514        #[inline]
515        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516            Display::fmt("No Error Message from operation", f)
517        }
518    }
519
520    impl StdError for NoErrorMessageFromOperation {}
521}
522
523#[cfg(test)]
524mod tests {
525    use super::{super::ObjectsManager, *};
526    use qiniu_apis::{
527        credential::Credential,
528        http::{HeaderValue, HttpCaller, SyncRequest, SyncResponse, SyncResponseResult},
529        http_client::{DirectChooser, HttpClient, NeverRetrier, Region, SyncResponseBody, NO_BACKOFF},
530    };
531    use qiniu_utils::BucketName;
532    use serde_json::{json, to_vec as json_to_vec};
533    use std::{
534        io::Read,
535        sync::atomic::{AtomicUsize, Ordering},
536    };
537
538    #[cfg(feature = "async")]
539    use {
540        futures::{future::BoxFuture, AsyncReadExt, StreamExt},
541        qiniu_apis::http::{AsyncRequest, AsyncResponse, AsyncResponseBody, AsyncResponseResult},
542    };
543
544    #[test]
545    fn test_sync_batch_ops() -> anyhow::Result<()> {
546        env_logger::builder().is_test(true).try_init().ok();
547
548        #[derive(Debug, Default)]
549        struct FakeHttpCaller {
550            counter: AtomicUsize,
551        }
552
553        impl HttpCaller for FakeHttpCaller {
554            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
555                let n = self.counter.fetch_add(1, Ordering::SeqCst);
556                let mut req_body = Vec::new();
557                request.body_mut().read_to_end(&mut req_body).unwrap();
558                let pairs: Vec<(String, String)> = form_urlencoded::parse(&req_body).into_owned().collect();
559                assert_eq!(pairs.len(), 3);
560                assert!(pairs.iter().all(|(k, _)| k == "op"));
561                let body = match n {
562                    0 => SyncResponseBody::from_bytes(
563                        json_to_vec(&json!([
564                            {"code": 200, "data": {}},
565                            {"code": 200, "data": {}},
566                            {"code": 200, "data": {}},
567                        ]))
568                        .unwrap(),
569                    ),
570                    1 => SyncResponseBody::from_bytes(
571                        json_to_vec(&json!([
572                            {"code": 200, "data": {}},
573                            {"code": 200, "data": {}},
574                            {"code": 612, "data": {"error": "no such file or directory"}},
575                        ]))
576                        .unwrap(),
577                    ),
578                    _ => unreachable!(),
579                };
580                Ok(SyncResponse::builder()
581                    .status_code(StatusCode::OK)
582                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
583                    .body(body)
584                    .build())
585            }
586
587            #[cfg(feature = "async")]
588            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
589                unreachable!()
590            }
591        }
592
593        let bucket = get_bucket(FakeHttpCaller::default());
594        let mut ops = batch_ops(&bucket);
595        let mut iter = ops.call();
596        for _ in 0..5 {
597            iter.next().unwrap().unwrap();
598        }
599        assert_eq!(
600            iter.next().unwrap().unwrap_err().kind(),
601            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(612)?)
602        );
603        Ok(())
604    }
605
606    #[cfg(feature = "async")]
607    #[async_std::test]
608    async fn test_async_batch_ops() -> anyhow::Result<()> {
609        env_logger::builder().is_test(true).try_init().ok();
610
611        #[derive(Debug, Default)]
612        struct FakeHttpCaller {
613            counter: AtomicUsize,
614        }
615
616        impl HttpCaller for FakeHttpCaller {
617            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
618                unreachable!()
619            }
620
621            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
622                Box::pin(async move {
623                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
624                    let mut req_body = Vec::new();
625                    request.body_mut().read_to_end(&mut req_body).await.unwrap();
626                    let pairs: Vec<(String, String)> = form_urlencoded::parse(&req_body).into_owned().collect();
627                    assert_eq!(pairs.len(), 3);
628                    assert!(pairs.iter().all(|(k, _)| k == "op"));
629                    let body = match n {
630                        0 => AsyncResponseBody::from_bytes(
631                            json_to_vec(&json!([
632                                {"code": 200, "data": {}},
633                                {"code": 200, "data": {}},
634                                {"code": 200, "data": {}},
635                            ]))
636                            .unwrap(),
637                        ),
638                        1 => AsyncResponseBody::from_bytes(
639                            json_to_vec(&json!([
640                                {"code": 200, "data": {}},
641                                {"code": 200, "data": {}},
642                                {"code": 612, "data": {"error": "no such file or directory"}},
643                            ]))
644                            .unwrap(),
645                        ),
646                        _ => unreachable!(),
647                    };
648                    Ok(AsyncResponse::builder()
649                        .status_code(StatusCode::OK)
650                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
651                        .body(body)
652                        .build())
653                })
654            }
655        }
656
657        let bucket = get_bucket(FakeHttpCaller::default());
658        let mut ops = batch_ops(&bucket);
659        let mut iter = ops.async_call();
660        for _ in 0..5 {
661            iter.next().await.unwrap().unwrap();
662        }
663        assert_eq!(
664            iter.next().await.unwrap().unwrap_err().kind(),
665            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(612)?)
666        );
667        Ok(())
668    }
669
670    fn get_bucket(caller: impl HttpCaller + 'static) -> Bucket {
671        let object_manager = ObjectsManager::builder(get_credential())
672            .http_client(
673                HttpClient::builder(caller)
674                    .chooser(DirectChooser)
675                    .request_retrier(NeverRetrier)
676                    .backoff(NO_BACKOFF)
677                    .build(),
678            )
679            .build();
680        object_manager.bucket_with_region(get_bucket_name(), single_rs_domain_region())
681    }
682
683    fn batch_ops(bucket: &Bucket) -> BatchOperations<'_> {
684        let mut ops = bucket.batch_ops();
685        ops.batch_size(3);
686        ops.add_operation(bucket.copy_object_to("fakeobject1", "fakebucket2", "fakeobject1"));
687        ops.add_operation(bucket.copy_object_to("fakeobject2", "fakebucket2", "fakeobject2"));
688        ops.add_operation(bucket.copy_object_to("fakeobject3", "fakebucket2", "fakeobject3"));
689        ops.add_operation(bucket.copy_object_to("fakeobject4", "fakebucket2", "fakeobject4"));
690        ops.add_operation(bucket.copy_object_to("fakeobject5", "fakebucket2", "fakeobject5"));
691        ops.add_operation(bucket.copy_object_to("fakeobject6", "fakebucket2", "fakeobject6"));
692        ops
693    }
694
695    fn get_credential() -> Credential {
696        Credential::new("fakeaccesskey", "fakesecretkey")
697    }
698
699    fn get_bucket_name() -> BucketName {
700        "fakebucketname".into()
701    }
702
703    fn single_rs_domain_region() -> Region {
704        Region::builder("chaotic")
705            .add_rs_preferred_endpoint(("fakers.example.com".to_owned(), 8080).into())
706            .build()
707    }
708}