1use super::{callbacks::Callbacks, list::make_callback_error, Bucket, OperationProvider};
2use anyhow::{Error as AnyError, Result as AnyResult};
3use assert_impl::assert_impl;
4use auto_impl::auto_impl;
5use dyn_clonable::clonable;
6use qiniu_apis::{
7 http::{ResponseErrorKind as HttpResponseErrorKind, ResponseParts, StatusCode},
8 http_client::{
9 ApiResult, RegionsProvider, RegionsProviderEndpoints, RequestBuilderParts, Response, ResponseError,
10 ResponseErrorKind,
11 },
12 storage::batch_ops::{
13 OperationResponse, OperationResponseData, RequestBody, ResponseBody,
14 SyncRequestBuilder as BatchOpsSyncRequestBuilder,
15 },
16};
17use std::{
18 collections::VecDeque,
19 error::Error as StdError,
20 fmt::{self, Debug, Display},
21 mem::take,
22};
23
24#[clonable]
26#[auto_impl(&, &mut, Box, Rc, Arc)]
27pub trait BatchSizeProvider: Clone + Debug + Send + Sync {
28 fn batch_size(&self) -> usize;
30}
31
32impl BatchSizeProvider for usize {
33 #[inline]
34 fn batch_size(&self) -> usize {
35 *self
36 }
37}
38
39pub struct BatchOperations<'a> {
41 bucket: &'a Bucket,
42 operations: Option<Box<dyn Iterator<Item = Box<dyn OperationProvider + 'a>> + Send + Sync + 'a>>,
43 batch_size: Option<Box<dyn BatchSizeProvider + 'a>>,
44 callbacks: Callbacks<'a>,
45}
46
47impl<'a> BatchOperations<'a> {
48 pub(super) fn new(bucket: &'a Bucket) -> Self {
49 Self {
50 bucket,
51 operations: Default::default(),
52 batch_size: Default::default(),
53 callbacks: Default::default(),
54 }
55 }
56
57 #[inline]
59 pub fn batch_size(&mut self, batch_size: impl BatchSizeProvider + 'a) -> &mut Self {
60 self.batch_size = Some(Box::new(batch_size));
61 self
62 }
63
64 #[inline]
66 pub fn add_operation(&mut self, operation: impl OperationProvider + 'a) -> &mut Self {
67 let new_iter = vec![Box::new(operation) as Box<dyn OperationProvider + 'a>].into_iter();
68 self.add_operations(new_iter)
69 }
70
71 #[inline]
73 pub fn add_operations<I: IntoIterator<Item = Box<dyn OperationProvider + 'a>> + Send + Sync + 'a>(
74 &mut self,
75 new_iter: I,
76 ) -> &mut Self
77 where
78 <I as IntoIterator>::IntoIter: Sync + Send,
79 {
80 if let Some(iter) = take(&mut self.operations) {
81 self.operations = Some(Box::new(iter.chain(new_iter)));
82 } else {
83 self.operations = Some(Box::new(new_iter.into_iter()));
84 }
85 self
86 }
87
88 #[inline]
90 pub fn before_request_callback(
91 &mut self,
92 callback: impl FnMut(&mut RequestBuilderParts<'_>) -> AnyResult<()> + Send + Sync + 'a,
93 ) -> &mut Self {
94 self.callbacks.insert_before_request_callback(callback);
95 self
96 }
97
98 #[inline]
100 pub fn after_response_ok_callback(
101 &mut self,
102 callback: impl FnMut(&mut ResponseParts) -> AnyResult<()> + Send + Sync + 'a,
103 ) -> &mut Self {
104 self.callbacks.insert_after_response_ok_callback(callback);
105 self
106 }
107
108 #[inline]
110 pub fn after_response_error_callback(
111 &mut self,
112 callback: impl FnMut(&mut ResponseError) -> AnyResult<()> + Send + Sync + 'a,
113 ) -> &mut Self {
114 self.callbacks.insert_after_response_error_callback(callback);
115 self
116 }
117
118 #[inline]
122 pub fn call(&mut self) -> BatchOperationsIterator<'a> {
123 BatchOperationsIterator {
124 operations: self.take_self(),
125 buffer: Default::default(),
126 closed: false,
127 }
128 }
129
130 #[inline]
132 #[cfg(feature = "async")]
133 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
134 pub fn async_call(&mut self) -> BatchOperationsStream<'a> {
135 BatchOperationsStream::new(self.take_self())
136 }
137
138 fn take_self(&mut self) -> Self {
139 Self {
140 bucket: self.bucket,
141 operations: take(&mut self.operations),
142 batch_size: take(&mut self.batch_size),
143 callbacks: take(&mut self.callbacks),
144 }
145 }
146
147 #[allow(dead_code)]
148 fn assert() {
149 assert_impl!(Send: Self);
150 assert_impl!(Sync: Self);
151 }
152}
153
154impl Debug for BatchOperations<'_> {
155 #[inline]
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("BatchOperations")
158 .field("bucket", &self.bucket)
159 .field("batch_size", &self.batch_size)
160 .finish()
161 }
162}
163
164#[derive(Debug)]
169pub struct BatchOperationsIterator<'a> {
170 operations: BatchOperations<'a>,
171 buffer: VecDeque<ApiResult<OperationResponseData>>,
172 closed: bool,
173}
174
175impl Iterator for BatchOperationsIterator<'_> {
176 type Item = ApiResult<OperationResponseData>;
177
178 #[inline]
179 fn next(&mut self) -> Option<Self::Item> {
180 if let Some(first) = self.buffer.pop_front() {
181 Some(first)
182 } else if self.closed {
183 None
184 } else {
185 self.next_response().map(|v| v.map(Ok)).unwrap_or_else(|e| Some(Err(e)))
186 }
187 }
188}
189
190const DEFAULT_BATCH_SIZE: usize = 1000;
191type RefRegionProviderEndpoints<'a> = RegionsProviderEndpoints<&'a dyn RegionsProvider>;
192
193impl<'a> BatchOperationsIterator<'a> {
194 fn next_response(&mut self) -> ApiResult<Option<OperationResponseData>> {
195 if let Some(request_body) = self.generate_request_body() {
196 let request = self.make_request()?;
197 let response = self.call_request(request, request_body)?;
198 self.handle_response(response.into_body()).transpose()
199 } else {
200 Ok(None)
201 }
202 }
203
204 fn make_request(&self) -> ApiResult<BatchOpsSyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
205 let request = self
206 .operations
207 .bucket
208 .objects_manager()
209 .client()
210 .storage()
211 .batch_ops()
212 .new_request(
213 RegionsProviderEndpoints::new(self.operations.bucket.region_provider()?),
214 self.operations.bucket.objects_manager().credential(),
215 );
216 Ok(request)
217 }
218
219 fn call_request(
220 &mut self,
221 mut request: BatchOpsSyncRequestBuilder<'_, RefRegionProviderEndpoints>,
222 request_body: RequestBody,
223 ) -> ApiResult<Response<ResponseBody>> {
224 self.operations
225 .callbacks
226 .before_request(request.parts_mut())
227 .map_err(make_callback_error)?;
228 let mut response_result = request.call(request_body);
229 self.operations
230 .callbacks
231 .after_response(&mut response_result)
232 .map_err(make_callback_error)?;
233 response_result
234 }
235
236 fn handle_response(&mut self, response_body: ResponseBody) -> Option<ApiResult<OperationResponseData>> {
237 let responses = response_body.to_operation_response_vec();
238 self.buffer
239 .extend(responses.into_iter().map(from_response_to_response_data_result));
240 self.buffer.pop_front()
241 }
242
243 fn generate_request_body(&mut self) -> Option<RequestBody> {
244 let mut request_body = RequestBody::default();
245 let mut operation_count = 0usize;
246 for _ in 0..self
247 .operations
248 .batch_size
249 .as_ref()
250 .map(|provider| provider.batch_size())
251 .unwrap_or(DEFAULT_BATCH_SIZE)
252 {
253 if let Some(mut operation) = self.operations.operations.as_mut().and_then(|op| op.next()) {
254 request_body = request_body.append_operations_as_str(operation.to_operation());
255 operation_count += 1;
256 } else {
257 self.closed = true;
258 break;
259 }
260 }
261 if operation_count > 0 {
262 Some(request_body)
263 } else {
264 None
265 }
266 }
267
268 #[allow(dead_code)]
269 fn assert() {
270 assert_impl!(Send: Self);
271 assert_impl!(Sync: Self);
272 }
273}
274
275#[cfg(feature = "async")]
276mod async_stream {
277 use super::*;
278 use futures::{future::BoxFuture, ready, FutureExt, Stream};
279 use qiniu_apis::storage::batch_ops::AsyncRequestBuilder as BatchOpsAsyncRequestBuilder;
280 use std::{
281 fmt::{self, Debug},
282 io::Result as IOResult,
283 pin::Pin,
284 task::{Context, Poll},
285 };
286
287 #[must_use]
292 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
293 #[derive(Debug)]
294 pub struct BatchOperationsStream<'a> {
295 operations: BatchOperations<'a>,
296 current_step: BatchOperationsStep<'a>,
297 closed: bool,
298 }
299
300 enum BatchOperationsStep<'a> {
301 FromBuffer {
302 buffer: VecDeque<ApiResult<OperationResponseData>>,
303 },
304 WaitForResponse {
305 task: BoxFuture<'a, ApiResult<Response<ResponseBody>>>,
306 },
307 WaitForRegionProvider {
308 task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
309 },
310 Done,
311 }
312
313 impl Default for BatchOperationsStep<'_> {
314 #[inline]
315 fn default() -> Self {
316 Self::FromBuffer { buffer: Default::default() }
317 }
318 }
319
320 impl Debug for BatchOperationsStep<'_> {
321 #[inline]
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 match self {
324 Self::FromBuffer { buffer } => f.debug_tuple("FromBuffer").field(buffer).finish(),
325 Self::WaitForResponse { .. } => f.debug_tuple("WaitForResponse").finish(),
326 Self::WaitForRegionProvider { .. } => f.debug_tuple("WaitForRegionProvider").finish(),
327 Self::Done => f.debug_tuple("Done").finish(),
328 }
329 }
330 }
331
332 impl Stream for BatchOperationsStream<'_> {
333 type Item = ApiResult<OperationResponseData>;
334
335 #[inline]
336 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
337 match self.current_step {
338 BatchOperationsStep::FromBuffer { .. } => self.read_from_buffer(cx),
339 BatchOperationsStep::WaitForResponse { .. } => self.wait_for_response(cx),
340 BatchOperationsStep::WaitForRegionProvider { .. } => self.wait_for_region(cx),
341 BatchOperationsStep::Done { .. } => Poll::Ready(None),
342 }
343 }
344 }
345
346 impl<'a> BatchOperationsStream<'a> {
347 pub(super) fn new(operations: BatchOperations<'a>) -> Self {
348 Self {
349 operations,
350 current_step: Default::default(),
351 closed: false,
352 }
353 }
354
355 fn read_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
356 if let BatchOperationsStep::FromBuffer { buffer } = &mut self.current_step {
357 if let Some(response) = buffer.pop_front() {
358 Poll::Ready(Some(response))
359 } else if self.closed {
360 self.current_step = BatchOperationsStep::Done;
361 self.poll_next(cx)
362 } else {
363 let bucket = self.operations.bucket;
364 self.current_step = BatchOperationsStep::WaitForRegionProvider {
365 task: Box::pin(async move { bucket.async_region_provider().await }),
366 };
367 self.poll_next(cx)
368 }
369 } else {
370 unreachable!()
371 }
372 }
373
374 fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
375 if let BatchOperationsStep::WaitForResponse { task } = &mut self.current_step {
376 let mut response_result = ready!(task.poll_unpin(cx));
377 if let Err(err) = self.operations.callbacks.after_response(&mut response_result) {
378 self.current_step = BatchOperationsStep::Done;
379 return Poll::Ready(Some(Err(make_callback_error(err))));
380 }
381 match response_result {
382 Ok(response) => {
383 self.current_step = BatchOperationsStep::FromBuffer {
384 buffer: response
385 .into_body()
386 .to_operation_response_vec()
387 .into_iter()
388 .map(from_response_to_response_data_result)
389 .collect(),
390 };
391 self.poll_next(cx)
392 }
393 Err(err) => {
394 self.current_step = BatchOperationsStep::Done;
395 Poll::Ready(Some(Err(err)))
396 }
397 }
398 } else {
399 unreachable!()
400 }
401 }
402
403 fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
404 if let BatchOperationsStep::WaitForRegionProvider { task } = &mut self.current_step {
405 match ready!(task.poll_unpin(cx)) {
406 Ok(region_provider) => {
407 if let Some(request_body) = self.generate_request_body() {
408 let mut request = self.make_request(region_provider);
409 if let Err(err) = self.operations.callbacks.before_request(request.parts_mut()) {
410 self.current_step = BatchOperationsStep::Done;
411 Poll::Ready(Some(Err(make_callback_error(err))))
412 } else {
413 self.current_step = BatchOperationsStep::WaitForResponse {
414 task: Box::pin(async move { request.call(request_body).await }),
415 };
416 self.poll_next(cx)
417 }
418 } else {
419 self.current_step = BatchOperationsStep::Done;
420 self.poll_next(cx)
421 }
422 }
423 Err(err) => {
424 self.current_step = BatchOperationsStep::Done;
425 Poll::Ready(Some(Err(err.into())))
426 }
427 }
428 } else {
429 unreachable!()
430 }
431 }
432
433 fn generate_request_body(&mut self) -> Option<RequestBody> {
434 let mut request_body = RequestBody::default();
435 let mut operation_count = 0usize;
436 for _ in 0..self
437 .operations
438 .batch_size
439 .as_ref()
440 .map(|provider| provider.batch_size())
441 .unwrap_or(DEFAULT_BATCH_SIZE)
442 {
443 if let Some(mut operation) = self.operations.operations.as_mut().and_then(|op| op.next()) {
444 request_body = request_body.append_operations_as_str(operation.to_operation());
445 operation_count += 1;
446 } else {
447 self.closed = true;
448 break;
449 }
450 }
451 if operation_count > 0 {
452 Some(request_body)
453 } else {
454 None
455 }
456 }
457
458 fn make_request(
459 &self,
460 region_provider: &'a dyn RegionsProvider,
461 ) -> BatchOpsAsyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>> {
462 self.operations
463 .bucket
464 .objects_manager()
465 .client()
466 .storage()
467 .batch_ops()
468 .new_async_request(
469 RegionsProviderEndpoints::new(region_provider),
470 self.operations.bucket.objects_manager().credential(),
471 )
472 }
473
474 #[allow(dead_code)]
475 fn assert() {
476 assert_impl!(Send: Self);
477 }
479 }
480}
481
482#[cfg(feature = "async")]
483pub use async_stream::*;
484
485fn from_response_to_response_data_result(response: OperationResponse) -> ApiResult<OperationResponseData> {
486 let status_code = StatusCode::from_u16(
487 response
488 .get_code_as_u64()
489 .try_into()
490 .map_err(make_invalid_request_response_error)?,
491 )
492 .map_err(make_invalid_request_response_error)?;
493
494 return if status_code == StatusCode::OK {
495 Ok(response.get_data().unwrap_or_default())
496 } else {
497 Err(ResponseError::new(
498 ResponseErrorKind::StatusCodeError(status_code),
499 response
500 .get_data()
501 .and_then(|data| data.get_error_as_str().map(|err| AnyError::msg(err.to_owned())))
502 .unwrap_or_else(|| NoErrorMessageFromOperation.into()),
503 ))
504 };
505
506 fn make_invalid_request_response_error(err: impl Into<AnyError>) -> ResponseError {
507 ResponseError::new(HttpResponseErrorKind::InvalidRequestResponse.into(), err)
508 }
509
510 #[derive(Clone, Debug)]
511 struct NoErrorMessageFromOperation;
512
513 impl Display for NoErrorMessageFromOperation {
514 #[inline]
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 Display::fmt("No Error Message from operation", f)
517 }
518 }
519
520 impl StdError for NoErrorMessageFromOperation {}
521}
522
523#[cfg(test)]
524mod tests {
525 use super::{super::ObjectsManager, *};
526 use qiniu_apis::{
527 credential::Credential,
528 http::{HeaderValue, HttpCaller, SyncRequest, SyncResponse, SyncResponseResult},
529 http_client::{DirectChooser, HttpClient, NeverRetrier, Region, SyncResponseBody, NO_BACKOFF},
530 };
531 use qiniu_utils::BucketName;
532 use serde_json::{json, to_vec as json_to_vec};
533 use std::{
534 io::Read,
535 sync::atomic::{AtomicUsize, Ordering},
536 };
537
538 #[cfg(feature = "async")]
539 use {
540 futures::{future::BoxFuture, AsyncReadExt, StreamExt},
541 qiniu_apis::http::{AsyncRequest, AsyncResponse, AsyncResponseBody, AsyncResponseResult},
542 };
543
544 #[test]
545 fn test_sync_batch_ops() -> anyhow::Result<()> {
546 env_logger::builder().is_test(true).try_init().ok();
547
548 #[derive(Debug, Default)]
549 struct FakeHttpCaller {
550 counter: AtomicUsize,
551 }
552
553 impl HttpCaller for FakeHttpCaller {
554 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
555 let n = self.counter.fetch_add(1, Ordering::SeqCst);
556 let mut req_body = Vec::new();
557 request.body_mut().read_to_end(&mut req_body).unwrap();
558 let pairs: Vec<(String, String)> = form_urlencoded::parse(&req_body).into_owned().collect();
559 assert_eq!(pairs.len(), 3);
560 assert!(pairs.iter().all(|(k, _)| k == "op"));
561 let body = match n {
562 0 => SyncResponseBody::from_bytes(
563 json_to_vec(&json!([
564 {"code": 200, "data": {}},
565 {"code": 200, "data": {}},
566 {"code": 200, "data": {}},
567 ]))
568 .unwrap(),
569 ),
570 1 => SyncResponseBody::from_bytes(
571 json_to_vec(&json!([
572 {"code": 200, "data": {}},
573 {"code": 200, "data": {}},
574 {"code": 612, "data": {"error": "no such file or directory"}},
575 ]))
576 .unwrap(),
577 ),
578 _ => unreachable!(),
579 };
580 Ok(SyncResponse::builder()
581 .status_code(StatusCode::OK)
582 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
583 .body(body)
584 .build())
585 }
586
587 #[cfg(feature = "async")]
588 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
589 unreachable!()
590 }
591 }
592
593 let bucket = get_bucket(FakeHttpCaller::default());
594 let mut ops = batch_ops(&bucket);
595 let mut iter = ops.call();
596 for _ in 0..5 {
597 iter.next().unwrap().unwrap();
598 }
599 assert_eq!(
600 iter.next().unwrap().unwrap_err().kind(),
601 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(612)?)
602 );
603 Ok(())
604 }
605
606 #[cfg(feature = "async")]
607 #[async_std::test]
608 async fn test_async_batch_ops() -> anyhow::Result<()> {
609 env_logger::builder().is_test(true).try_init().ok();
610
611 #[derive(Debug, Default)]
612 struct FakeHttpCaller {
613 counter: AtomicUsize,
614 }
615
616 impl HttpCaller for FakeHttpCaller {
617 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
618 unreachable!()
619 }
620
621 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
622 Box::pin(async move {
623 let n = self.counter.fetch_add(1, Ordering::SeqCst);
624 let mut req_body = Vec::new();
625 request.body_mut().read_to_end(&mut req_body).await.unwrap();
626 let pairs: Vec<(String, String)> = form_urlencoded::parse(&req_body).into_owned().collect();
627 assert_eq!(pairs.len(), 3);
628 assert!(pairs.iter().all(|(k, _)| k == "op"));
629 let body = match n {
630 0 => AsyncResponseBody::from_bytes(
631 json_to_vec(&json!([
632 {"code": 200, "data": {}},
633 {"code": 200, "data": {}},
634 {"code": 200, "data": {}},
635 ]))
636 .unwrap(),
637 ),
638 1 => AsyncResponseBody::from_bytes(
639 json_to_vec(&json!([
640 {"code": 200, "data": {}},
641 {"code": 200, "data": {}},
642 {"code": 612, "data": {"error": "no such file or directory"}},
643 ]))
644 .unwrap(),
645 ),
646 _ => unreachable!(),
647 };
648 Ok(AsyncResponse::builder()
649 .status_code(StatusCode::OK)
650 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
651 .body(body)
652 .build())
653 })
654 }
655 }
656
657 let bucket = get_bucket(FakeHttpCaller::default());
658 let mut ops = batch_ops(&bucket);
659 let mut iter = ops.async_call();
660 for _ in 0..5 {
661 iter.next().await.unwrap().unwrap();
662 }
663 assert_eq!(
664 iter.next().await.unwrap().unwrap_err().kind(),
665 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(612)?)
666 );
667 Ok(())
668 }
669
670 fn get_bucket(caller: impl HttpCaller + 'static) -> Bucket {
671 let object_manager = ObjectsManager::builder(get_credential())
672 .http_client(
673 HttpClient::builder(caller)
674 .chooser(DirectChooser)
675 .request_retrier(NeverRetrier)
676 .backoff(NO_BACKOFF)
677 .build(),
678 )
679 .build();
680 object_manager.bucket_with_region(get_bucket_name(), single_rs_domain_region())
681 }
682
683 fn batch_ops(bucket: &Bucket) -> BatchOperations<'_> {
684 let mut ops = bucket.batch_ops();
685 ops.batch_size(3);
686 ops.add_operation(bucket.copy_object_to("fakeobject1", "fakebucket2", "fakeobject1"));
687 ops.add_operation(bucket.copy_object_to("fakeobject2", "fakebucket2", "fakeobject2"));
688 ops.add_operation(bucket.copy_object_to("fakeobject3", "fakebucket2", "fakeobject3"));
689 ops.add_operation(bucket.copy_object_to("fakeobject4", "fakebucket2", "fakeobject4"));
690 ops.add_operation(bucket.copy_object_to("fakeobject5", "fakebucket2", "fakeobject5"));
691 ops.add_operation(bucket.copy_object_to("fakeobject6", "fakebucket2", "fakeobject6"));
692 ops
693 }
694
695 fn get_credential() -> Credential {
696 Credential::new("fakeaccesskey", "fakesecretkey")
697 }
698
699 fn get_bucket_name() -> BucketName {
700 "fakebucketname".into()
701 }
702
703 fn single_rs_domain_region() -> Region {
704 Region::builder("chaotic")
705 .add_rs_preferred_endpoint(("fakers.example.com".to_owned(), 8080).into())
706 .build()
707 }
708}