1use super::{callbacks::Callbacks, Bucket};
2use anyhow::Error as AnyError;
3use assert_impl::assert_impl;
4use log::warn;
5use qiniu_apis::{
6 http::ResponseErrorKind as HttpResponseErrorKind,
7 http_client::{ApiResult, RegionsProvider, RegionsProviderEndpoints, Response, ResponseError, SyncResponseBody},
8 storage::get_objects::{
9 ListedObjectEntry, QueryParams, ResponseBody as GetObjectsV1ResponseBody,
10 SyncRequestBuilder as GetObjectsV1SyncRequestBuilder,
11 },
12 storage::get_objects_v2::SyncRequestBuilder as GetObjectsV2SyncRequestBuilder,
13};
14use serde::{Deserialize, Serialize};
15use std::{
16 borrow::Cow,
17 collections::VecDeque,
18 fmt::{self, Debug},
19 io::{BufRead, BufReader, Lines},
20};
21use tap::prelude::*;
22
23#[cfg(feature = "async")]
24use {futures::io::BufReader as AsyncBufReader, qiniu_apis::http_client::AsyncResponseBody};
25
26type RefRegionProviderEndpoints<'a> = RegionsProviderEndpoints<&'a dyn RegionsProvider>;
27
28#[derive(Debug, Clone)]
29struct ListParams<'a> {
30 bucket: &'a Bucket,
31 prefix: Option<Cow<'a, str>>,
32 limit: Limit,
33 marker: Marker<'a>,
34 need_parts: bool,
35}
36
37#[derive(Debug, Clone)]
38enum Marker<'a> {
39 Original(Option<Cow<'a, str>>),
40 Subsequent(Option<String>),
41}
42
43impl<'a> Marker<'a> {
44 fn new(marker: Option<Cow<'a, str>>) -> Self {
45 Self::Original(marker)
46 }
47
48 fn empty(&self) -> bool {
49 matches!(self.as_ref().map(|s| s.is_empty()), Some(true) | None)
50 }
51
52 fn as_ref(&self) -> Option<&str> {
53 match self {
54 Self::Original(marker) => marker.as_deref(),
55 Self::Subsequent(marker) => marker.as_deref(),
56 }
57 }
58
59 fn set(&mut self, marker: Option<&str>) {
60 *self = Self::Subsequent(marker.map(|s| s.to_owned()));
61 }
62
63 fn is_original(&self) -> bool {
64 matches!(self, Self::Original(..))
65 }
66}
67
68#[derive(Copy, Debug, Clone)]
69struct Limit {
70 limit: Option<usize>,
71 max: Option<usize>,
72}
73
74impl Limit {
75 fn new(limit: Option<usize>, version: ListVersion) -> Self {
76 Self {
77 limit,
78 max: version.page_limit(),
79 }
80 }
81
82 fn as_ref(&self) -> Option<usize> {
83 match (self.limit, self.max) {
84 (Some(limit), Some(max)) => Some(limit.min(max)),
85 (Some(limit), None) => Some(limit),
86 (None, Some(max)) => Some(max),
87 (None, None) => None,
88 }
89 }
90
91 fn exhausted(&self) -> bool {
92 matches!(self.limit, Some(0))
93 }
94
95 fn saturating_decrease(&mut self, sub: usize) {
96 if let Some(limit) = self.limit.as_mut() {
97 *limit = limit.saturating_sub(sub);
98 }
99 }
100}
101
102impl<'a> ListParams<'a> {
103 fn to_query_params(&self) -> QueryParams<'a> {
104 let mut query_params = QueryParams::default().set_bucket_as_str(self.bucket.name().as_str());
105 if let Some(marker) = self.marker.as_ref() {
106 query_params = query_params.set_marker_as_str(marker.to_owned());
107 }
108 if let Some(limit) = self.limit.as_ref() {
109 query_params = query_params.set_limit_as_usize(limit);
110 }
111 if let Some(prefix) = self.prefix.as_ref() {
112 query_params = query_params.set_prefix_as_str(prefix.clone());
113 }
114 if self.need_parts {
115 query_params = query_params.set_need_parts_as_bool(true);
116 }
117 query_params
118 }
119
120 fn have_done(&self) -> bool {
121 self.limit.exhausted() || !self.marker.is_original() && self.marker.empty()
122 }
123}
124
125#[must_use]
132pub struct ListIter<'a> {
133 params: ListParams<'a>,
134 version: SyncListVersionWithStep,
135 callbacks: Callbacks<'a>,
136}
137
138impl ListIter<'_> {
139 pub fn marker(&self) -> Option<&str> {
141 self.params.marker.as_ref()
142 }
143}
144
145impl Debug for ListIter<'_> {
146 #[inline]
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 f.debug_struct("ListIter")
149 .field("params", &self.params)
150 .field("version", &self.version)
151 .finish()
152 }
153}
154
155#[derive(Copy, Clone, Debug, Default)]
159#[non_exhaustive]
160pub enum ListVersion {
161 #[default]
163 V1,
164
165 V2,
167}
168
169impl ListVersion {
170 fn page_limit(self) -> Option<usize> {
171 const V1_PAGE_SIZE_MAX: usize = 1000;
172
173 match self {
174 Self::V1 => Some(V1_PAGE_SIZE_MAX),
175 Self::V2 => None,
176 }
177 }
178}
179
180#[derive(Debug)]
181enum SyncListVersionWithStep {
182 V1(SyncListV1Step),
183 V2(SyncListV2Step),
184}
185
186impl From<ListVersion> for SyncListVersionWithStep {
187 fn from(version: ListVersion) -> Self {
188 match version {
189 ListVersion::V1 => Self::V1(Default::default()),
190 ListVersion::V2 => Self::V2(Default::default()),
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
196pub(super) enum SyncListV1Step {
197 Buffer {
198 buffer: VecDeque<ListedObjectEntry>,
199 },
200 Done,
201}
202
203impl Default for SyncListV1Step {
204 #[inline]
205 fn default() -> Self {
206 Self::Buffer { buffer: Default::default() }
207 }
208}
209
210#[derive(Debug, Default)]
211pub(super) enum SyncListV2Step {
212 #[default]
213 Start,
214 Lines {
215 lines: Lines<BufReader<SyncResponseBody>>,
216 },
217 Done,
218}
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221struct ListedObjectEntryV2 {
222 item: Option<ListedObjectEntry>,
223 marker: Option<String>,
224}
225
226impl<'a> ListIter<'a> {
227 pub(super) fn new(
228 bucket: &'a Bucket,
229 limit: Option<usize>,
230 prefix: Option<Cow<'a, str>>,
231 marker: Option<Cow<'a, str>>,
232 need_parts: bool,
233 version: ListVersion,
234 callbacks: Callbacks<'a>,
235 ) -> Self {
236 Self {
237 callbacks,
238 version: version.into(),
239 params: ListParams {
240 bucket,
241 prefix,
242 need_parts,
243 limit: Limit::new(limit, version),
244 marker: Marker::new(marker),
245 },
246 }
247 }
248
249 #[allow(dead_code)]
250 fn assert() {
251 assert_impl!(Send: Self);
252 }
254}
255
256impl Iterator for ListIter<'_> {
257 type Item = ApiResult<ListedObjectEntry>;
258
259 #[inline]
260 fn next(&mut self) -> Option<Self::Item> {
261 return match &mut self.version {
262 SyncListVersionWithStep::V1(step) => v1_next(&mut self.params, &mut self.callbacks, step),
263 SyncListVersionWithStep::V2(step) => v2_next(&mut self.params, &mut self.callbacks, step),
264 };
265
266 fn v1_next(
267 params: &mut ListParams<'_>,
268 callbacks: &mut Callbacks<'_>,
269 step: &mut SyncListV1Step,
270 ) -> Option<ApiResult<ListedObjectEntry>> {
271 match step {
272 SyncListV1Step::Buffer { buffer } => {
273 if let Some(object) = buffer.pop_front() {
274 Some(Ok(object))
275 } else {
276 match v1_next_page(params, callbacks, buffer) {
277 Ok(true) => {
278 *step = SyncListV1Step::Done;
279 None
280 }
281 Ok(false) => buffer.pop_front().map(Ok),
282 Err(err) => {
283 *step = SyncListV1Step::Done;
284 Some(Err(err))
285 }
286 }
287 }
288 }
289 SyncListV1Step::Done => None,
290 }
291 }
292
293 fn v1_next_page(
294 params: &mut ListParams<'_>,
295 callbacks: &mut Callbacks<'_>,
296 buffer: &mut VecDeque<ListedObjectEntry>,
297 ) -> ApiResult<bool> {
298 let mut have_done = false;
299 if params.have_done() {
300 have_done = true;
301 } else {
302 let request = v1_make_request(params)?;
303 let response_result = v1_call_request(request, callbacks);
304 v1_handle_response(response_result?.into_body(), params, buffer);
305 }
306 Ok(have_done)
307 }
308
309 fn v1_make_request<'a>(
310 params: &mut ListParams<'a>,
311 ) -> ApiResult<GetObjectsV1SyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
312 let mut request = params
313 .bucket
314 .objects_manager()
315 .client()
316 .storage()
317 .get_objects()
318 .new_request(
319 RegionsProviderEndpoints::new(params.bucket.region_provider()?),
320 params.bucket.objects_manager().credential(),
321 );
322 request.query_pairs(params.to_query_params());
323 Ok(request)
324 }
325
326 fn v1_call_request(
327 mut request: GetObjectsV1SyncRequestBuilder<'_, RefRegionProviderEndpoints>,
328 callbacks: &mut Callbacks<'_>,
329 ) -> ApiResult<Response<GetObjectsV1ResponseBody>> {
330 callbacks
331 .before_request(request.parts_mut())
332 .map_err(make_callback_error)?;
333 let mut response_result = request.call();
334 callbacks
335 .after_response(&mut response_result)
336 .map_err(make_callback_error)?;
337 response_result
338 }
339
340 fn v1_handle_response(
341 body: GetObjectsV1ResponseBody,
342 params: &mut ListParams<'_>,
343 buffer: &mut VecDeque<ListedObjectEntry>,
344 ) {
345 params.marker.set(body.get_marker_as_str());
346 let listed_object_entries = body.get_items().to_listed_object_entry_vec();
347 params.limit.saturating_decrease(listed_object_entries.len());
348 *buffer = listed_object_entries.into();
349 }
350
351 fn v2_next(
352 params: &mut ListParams<'_>,
353 callbacks: &mut Callbacks<'_>,
354 step: &mut SyncListV2Step,
355 ) -> Option<ApiResult<ListedObjectEntry>> {
356 match step {
357 SyncListV2Step::Start => match v2_call(params, callbacks) {
358 Ok(Some(mut lines)) => v2_read_entry_from_lines(params, &mut lines).tap_some(|result| {
359 if result.is_ok() {
360 *step = SyncListV2Step::Lines { lines };
361 } else {
362 *step = SyncListV2Step::Done;
363 }
364 }),
365 Ok(None) => {
366 *step = SyncListV2Step::Done;
367 None
368 }
369 Err(err) => {
370 *step = SyncListV2Step::Done;
371 Some(Err(err))
372 }
373 },
374 SyncListV2Step::Lines { lines } => match v2_read_entry_from_lines(params, lines) {
375 Some(Ok(entry)) => Some(Ok(entry)),
376 Some(Err(err)) => {
377 warn!("Read Error from ListV2 Response Body: {}", err);
378 *step = SyncListV2Step::Start;
379 v2_next(params, callbacks, step)
380 }
381 None => {
382 *step = SyncListV2Step::Start;
383 v2_next(params, callbacks, step)
384 }
385 },
386 SyncListV2Step::Done => None,
387 }
388 }
389
390 fn v2_read_entry_from_lines(
391 params: &mut ListParams<'_>,
392 lines: &mut Lines<BufReader<SyncResponseBody>>,
393 ) -> Option<ApiResult<ListedObjectEntry>> {
394 if params.limit.exhausted() {
395 return None;
396 }
397 loop {
398 match lines.next() {
399 Some(Ok(line)) if line.is_empty() => {
400 continue;
401 }
402 Some(Ok(line)) => match serde_json::from_str::<ListedObjectEntryV2>(&line) {
403 Ok(parsed) => {
404 params.marker.set(parsed.marker.as_deref());
405 if let Some(item) = parsed.item {
406 params.limit.saturating_decrease(1);
407 return Some(Ok(item));
408 } else {
409 continue;
410 }
411 }
412 Err(err) => {
413 return Some(Err(err.into()));
414 }
415 },
416 Some(Err(err)) => {
417 return Some(Err(err.into()));
418 }
419 None => {
420 return None;
421 }
422 }
423 }
424 }
425
426 fn v2_call(
427 params: &mut ListParams<'_>,
428 callbacks: &mut Callbacks<'_>,
429 ) -> ApiResult<Option<Lines<BufReader<SyncResponseBody>>>> {
430 if params.have_done() {
431 return Ok(None);
432 }
433 let request = v2_make_request(params)?;
434 let response_result = v2_call_request(request, callbacks);
435 Ok(Some(BufReader::new(response_result?.into_body()).lines()))
436 }
437
438 fn v2_make_request<'a>(
439 params: &mut ListParams<'a>,
440 ) -> ApiResult<GetObjectsV2SyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
441 let mut request = params
442 .bucket
443 .objects_manager()
444 .client()
445 .storage()
446 .get_objects_v2()
447 .new_request(
448 RegionsProviderEndpoints::new(params.bucket.region_provider()?),
449 params.bucket.objects_manager().credential(),
450 );
451 request.query_pairs(params.to_query_params());
452 Ok(request)
453 }
454
455 fn v2_call_request(
456 mut request: GetObjectsV2SyncRequestBuilder<'_, RefRegionProviderEndpoints>,
457 callbacks: &mut Callbacks<'_>,
458 ) -> ApiResult<Response<SyncResponseBody>> {
459 callbacks
460 .before_request(request.parts_mut())
461 .map_err(make_callback_error)?;
462 let mut response_result = request.call();
463 callbacks
464 .after_response(&mut response_result)
465 .map_err(make_callback_error)?;
466 response_result
467 }
468 }
469}
470
471#[cfg(feature = "async")]
472mod async_list_stream {
473 use super::*;
474 use futures::{future::BoxFuture, io::Lines as AsyncLines, ready, AsyncBufReadExt, FutureExt, Stream, StreamExt};
475 use std::{
476 fmt::{self, Debug},
477 io::Result as IOResult,
478 pin::Pin,
479 task::{Context, Poll},
480 };
481
482 enum AsyncListVersionWithStep<'a> {
483 V1(AsyncListV1Step<'a>),
484 V2(AsyncListV2Step<'a>),
485 }
486
487 impl From<ListVersion> for AsyncListVersionWithStep<'_> {
488 fn from(version: ListVersion) -> Self {
489 match version {
490 ListVersion::V1 => Self::V1(Default::default()),
491 ListVersion::V2 => Self::V2(Default::default()),
492 }
493 }
494 }
495
496 enum AsyncListV1Step<'a> {
497 FromBuffer {
498 buffer: VecDeque<ListedObjectEntry>,
499 },
500 WaitForResponse {
501 task: BoxFuture<'a, ApiResult<Response<GetObjectsV1ResponseBody>>>,
502 },
503 WaitForRegionProvider {
504 task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
505 },
506 Done,
507 }
508
509 impl Default for AsyncListV1Step<'_> {
510 #[inline]
511 fn default() -> Self {
512 Self::FromBuffer { buffer: Default::default() }
513 }
514 }
515
516 impl Debug for AsyncListV1Step<'_> {
517 #[inline]
518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519 match self {
520 Self::FromBuffer { buffer } => f.debug_tuple("FromBuffer").field(buffer).finish(),
521 Self::WaitForResponse { .. } => f.debug_tuple("WaitForResponse").finish(),
522 Self::WaitForRegionProvider { .. } => f.debug_tuple("WaitForRegionProvider").finish(),
523 Self::Done => f.debug_tuple("Done").finish(),
524 }
525 }
526 }
527
528 trait StreamWithMarker: Stream {
529 fn marker(&self) -> Option<&str>;
530 }
531 type BoxStreamWithMarker<'a, T> = Pin<Box<dyn StreamWithMarker<Item = T> + Send + 'a>>;
532 type ListedObjectEntryResultStream<'a> = BoxStreamWithMarker<'a, ApiResult<ListedObjectEntry>>;
533
534 #[must_use]
541 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
542 pub struct ListStream<'a>(ListedObjectEntryResultStream<'a>);
543
544 impl<'a> ListStream<'a> {
545 pub(in super::super) fn new(
546 bucket: &'a Bucket,
547 limit: Option<usize>,
548 prefix: Option<Cow<'a, str>>,
549 marker: Option<Cow<'a, str>>,
550 need_parts: bool,
551 version: ListVersion,
552 callbacks: Callbacks<'a>,
553 ) -> Self {
554 Self(match version {
555 ListVersion::V1 => v1_next(bucket, limit, prefix, marker, need_parts, callbacks),
556 ListVersion::V2 => v2_next(bucket, limit, prefix, marker, need_parts, callbacks),
557 })
558 }
559
560 #[allow(dead_code)]
561 fn assert() {
562 assert_impl!(Send: Self);
563 }
565
566 pub fn marker(&self) -> Option<&str> {
568 self.0.marker()
569 }
570 }
571
572 impl Stream for ListStream<'_> {
573 type Item = ApiResult<ListedObjectEntry>;
574
575 #[inline]
576 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577 self.0.poll_next_unpin(cx)
578 }
579 }
580
581 impl Debug for ListStream<'_> {
582 #[inline]
583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584 f.debug_struct("ListStream").finish()
585 }
586 }
587
588 #[derive(Debug)]
589 struct ListV1Stream<'a> {
590 params: ListParams<'a>,
591 callbacks: Callbacks<'a>,
592 current_step: AsyncListV1Step<'a>,
593 }
594
595 fn v1_next<'a>(
596 bucket: &'a Bucket,
597 limit: Option<usize>,
598 prefix: Option<Cow<'a, str>>,
599 marker: Option<Cow<'a, str>>,
600 need_parts: bool,
601 callbacks: Callbacks<'a>,
602 ) -> ListedObjectEntryResultStream<'a> {
603 let params = ListParams {
604 bucket,
605 prefix,
606 need_parts,
607 limit: Limit::new(limit, ListVersion::V1),
608 marker: Marker::new(marker),
609 };
610 Box::pin(ListV1Stream {
611 params,
612 callbacks,
613 current_step: Default::default(),
614 })
615 }
616
617 impl Stream for ListV1Stream<'_> {
618 type Item = ApiResult<ListedObjectEntry>;
619
620 #[inline]
621 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622 match self.current_step {
623 AsyncListV1Step::FromBuffer { .. } => self.read_from_buffer(cx),
624 AsyncListV1Step::WaitForResponse { .. } => self.wait_for_response(cx),
625 AsyncListV1Step::WaitForRegionProvider { .. } => self.wait_for_region(cx),
626 AsyncListV1Step::Done => Poll::Ready(None),
627 }
628 }
629 }
630
631 impl StreamWithMarker for ListV1Stream<'_> {
632 fn marker(&self) -> Option<&str> {
633 self.params.marker.as_ref()
634 }
635 }
636
637 impl ListV1Stream<'_> {
638 fn read_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
639 if let AsyncListV1Step::FromBuffer { buffer } = &mut self.current_step {
640 if let Some(object) = buffer.pop_front() {
641 Poll::Ready(Some(Ok(object)))
642 } else {
643 if self.params.have_done() {
644 self.current_step = AsyncListV1Step::Done;
645 } else {
646 let bucket = self.params.bucket;
647 self.current_step = AsyncListV1Step::WaitForRegionProvider {
648 task: Box::pin(async move { bucket.async_region_provider().await }),
649 };
650 }
651 self.poll_next(cx)
652 }
653 } else {
654 unreachable!()
655 }
656 }
657
658 fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
659 if let AsyncListV1Step::WaitForRegionProvider { task } = &mut self.current_step {
660 match ready!(task.poll_unpin(cx)) {
661 Ok(region_provider) => {
662 let credential = self.params.bucket.objects_manager().credential();
663 let mut request = self
664 .params
665 .bucket
666 .objects_manager()
667 .client()
668 .storage()
669 .get_objects()
670 .new_async_request(RegionsProviderEndpoints::new(region_provider), credential);
671 request.query_pairs(self.params.to_query_params());
672 if let Err(err) = self.callbacks.before_request(request.parts_mut()) {
673 self.current_step = AsyncListV1Step::Done;
674 Poll::Ready(Some(Err(make_callback_error(err))))
675 } else {
676 self.current_step = AsyncListV1Step::WaitForResponse {
677 task: Box::pin(async move { request.call().await }),
678 };
679 self.poll_next(cx)
680 }
681 }
682 Err(err) => {
683 self.current_step = AsyncListV1Step::Done;
684 Poll::Ready(Some(Err(err.into())))
685 }
686 }
687 } else {
688 unreachable!()
689 }
690 }
691
692 fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
693 if let AsyncListV1Step::WaitForResponse { task } = &mut self.current_step {
694 let mut response_result = ready!(task.poll_unpin(cx));
695 if let Err(err) = self.callbacks.after_response(&mut response_result) {
696 self.current_step = AsyncListV1Step::Done;
697 Poll::Ready(Some(Err(make_callback_error(err))))
698 } else {
699 match response_result {
700 Ok(response) => {
701 let body = response.into_body();
702 let listed_object_entries = body.get_items().to_listed_object_entry_vec();
703 self.params.marker.set(body.get_marker_as_str());
704 self.params.limit.saturating_decrease(listed_object_entries.len());
705 self.current_step = AsyncListV1Step::FromBuffer {
706 buffer: listed_object_entries.into(),
707 };
708 self.poll_next(cx)
709 }
710 Err(err) => {
711 self.current_step = AsyncListV1Step::Done;
712 Poll::Ready(Some(Err(err)))
713 }
714 }
715 }
716 } else {
717 unreachable!()
718 }
719 }
720 }
721
722 #[derive(Default)]
723 enum AsyncListV2Step<'a> {
724 #[default]
725 Start,
726 WaitForRegionProvider {
727 task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
728 },
729 WaitForResponse {
730 task: BoxFuture<'a, ApiResult<Response<AsyncResponseBody>>>,
731 },
732 WaitForEntries {
733 lines: AsyncLines<AsyncBufReader<AsyncResponseBody>>,
734 empty: bool,
735 },
736 Done,
737 }
738
739 struct ListV2Stream<'a> {
740 params: ListParams<'a>,
741 callbacks: Callbacks<'a>,
742 current_step: AsyncListV2Step<'a>,
743 }
744
745 #[allow(clippy::too_many_arguments)]
746 fn v2_next<'a>(
747 bucket: &'a Bucket,
748 limit: Option<usize>,
749 prefix: Option<Cow<'a, str>>,
750 marker: Option<Cow<'a, str>>,
751 need_parts: bool,
752 callbacks: Callbacks<'a>,
753 ) -> ListedObjectEntryResultStream<'a> {
754 let params = ListParams {
755 bucket,
756 prefix,
757 need_parts,
758 limit: Limit::new(limit, ListVersion::V2),
759 marker: Marker::new(marker),
760 };
761 Box::pin(ListV2Stream {
762 params,
763 callbacks,
764 current_step: Default::default(),
765 })
766 }
767
768 impl Stream for ListV2Stream<'_> {
769 type Item = ApiResult<ListedObjectEntry>;
770
771 #[inline]
772 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
773 match self.current_step {
774 AsyncListV2Step::Start { .. } => self.start(cx),
775 AsyncListV2Step::WaitForResponse { .. } => self.wait_for_response(cx),
776 AsyncListV2Step::WaitForRegionProvider { .. } => self.wait_for_region(cx),
777 AsyncListV2Step::WaitForEntries { .. } => self.wait_for_entries(cx),
778 AsyncListV2Step::Done => Poll::Ready(None),
779 }
780 }
781 }
782
783 impl StreamWithMarker for ListV2Stream<'_> {
784 fn marker(&self) -> Option<&str> {
785 self.params.marker.as_ref()
786 }
787 }
788
789 impl ListV2Stream<'_> {
790 fn start(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
791 if let AsyncListV2Step::Start { .. } = &mut self.current_step {
792 if self.params.have_done() {
793 self.current_step = AsyncListV2Step::Done;
794 } else {
795 let bucket = self.params.bucket;
796 self.current_step = AsyncListV2Step::WaitForRegionProvider {
797 task: Box::pin(async move { bucket.async_region_provider().await }),
798 };
799 }
800 self.poll_next(cx)
801 } else {
802 unreachable!()
803 }
804 }
805
806 fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
807 if let AsyncListV2Step::WaitForRegionProvider { task } = &mut self.current_step {
808 match ready!(task.poll_unpin(cx)) {
809 Ok(region_provider) => {
810 let credential = self.params.bucket.objects_manager().credential();
811 let mut request = self
812 .params
813 .bucket
814 .objects_manager()
815 .client()
816 .storage()
817 .get_objects_v2()
818 .new_async_request(RegionsProviderEndpoints::new(region_provider), credential);
819 request.query_pairs(self.params.to_query_params());
820 if let Err(err) = self.callbacks.before_request(request.parts_mut()) {
821 self.current_step = AsyncListV2Step::Done;
822 Poll::Ready(Some(Err(make_callback_error(err))))
823 } else {
824 self.current_step = AsyncListV2Step::WaitForResponse {
825 task: Box::pin(async move { request.call().await }),
826 };
827 self.poll_next(cx)
828 }
829 }
830 Err(err) => {
831 self.current_step = AsyncListV2Step::Done;
832 Poll::Ready(Some(Err(err.into())))
833 }
834 }
835 } else {
836 unreachable!()
837 }
838 }
839
840 fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
841 if let AsyncListV2Step::WaitForResponse { task } = &mut self.current_step {
842 let mut response_result = ready!(task.poll_unpin(cx));
843 if let Err(err) = self.callbacks.after_response(&mut response_result) {
844 self.current_step = AsyncListV2Step::Done;
845 Poll::Ready(Some(Err(make_callback_error(err))))
846 } else {
847 match response_result {
848 Ok(response) => {
849 self.current_step = AsyncListV2Step::WaitForEntries {
850 lines: AsyncBufReader::new(response.into_body()).lines(),
851 empty: true,
852 };
853 self.poll_next(cx)
854 }
855 Err(err) => {
856 self.current_step = AsyncListV2Step::Done;
857 Poll::Ready(Some(Err(err)))
858 }
859 }
860 }
861 } else {
862 unreachable!()
863 }
864 }
865
866 fn wait_for_entries(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
867 if let AsyncListV2Step::WaitForEntries { lines, ref mut empty } = &mut self.current_step {
868 match ready!(lines.poll_next_unpin(cx)) {
869 Some(Ok(line)) if line.is_empty() => self.wait_for_entries(cx),
870 Some(Ok(line)) => match serde_json::from_str::<ListedObjectEntryV2>(&line) {
871 Ok(parsed) => {
872 *empty = false;
873 self.params.marker.set(parsed.marker.as_deref());
874 if let Some(item) = parsed.item {
875 self.params.limit.saturating_decrease(1);
876 Poll::Ready(Some(Ok(item)))
877 } else {
878 self.wait_for_entries(cx)
879 }
880 }
881 Err(err) => {
882 self.current_step = AsyncListV2Step::Done;
883 Poll::Ready(Some(Err(err.into())))
884 }
885 },
886 Some(Err(err)) => {
887 self.current_step = AsyncListV2Step::Done;
888 Poll::Ready(Some(Err(err.into())))
889 }
890 None if *empty => {
891 self.current_step = AsyncListV2Step::Done;
892 Poll::Ready(None)
893 }
894 None => {
895 self.current_step = AsyncListV2Step::Start;
896 self.poll_next(cx)
897 }
898 }
899 } else {
900 unreachable!()
901 }
902 }
903 }
904}
905
906pub(super) fn make_callback_error(err: AnyError) -> ResponseError {
907 ResponseError::new_with_msg(HttpResponseErrorKind::CallbackError.into(), err)
908}
909
910#[cfg(feature = "async")]
911pub use async_list_stream::*;
912
913#[cfg(test)]
914mod tests {
915 use super::{super::ObjectsManager, *};
916 use anyhow::Error as AnyError;
917 use qiniu_apis::{
918 credential::Credential,
919 http::{HeaderValue, HttpCaller, StatusCode, SyncRequest, SyncResponse, SyncResponseResult},
920 http_client::{BucketName, DirectChooser, HttpClient, NeverRetrier, Region, ResponseErrorKind, NO_BACKOFF},
921 };
922 use serde_json::{json, to_string as json_to_string, to_vec as json_to_vec};
923 use std::{
924 sync::{
925 atomic::{AtomicUsize, Ordering},
926 Arc,
927 },
928 time::{SystemTime, UNIX_EPOCH},
929 };
930
931 #[cfg(feature = "async")]
932 use {
933 futures::{future::BoxFuture, StreamExt, TryStreamExt},
934 qiniu_apis::http::{AsyncRequest, AsyncResponse, AsyncResponseResult},
935 };
936
937 #[test]
938 fn test_sync_list_v1() -> anyhow::Result<()> {
939 env_logger::builder().is_test(true).try_init().ok();
940
941 #[derive(Debug, Default)]
942 struct FakeHttpCaller {
943 counter: AtomicUsize,
944 }
945
946 impl HttpCaller for FakeHttpCaller {
947 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
948 let n = self.counter.fetch_add(1, Ordering::SeqCst);
949 let body = match n {
950 0 => {
951 assert!(request
952 .url()
953 .to_string()
954 .ends_with("/list?bucket=fakebucketname&limit=1000"));
955 SyncResponseBody::from_bytes(
956 json_to_vec(&json!({
957 "marker": "fakemarker",
958 "items": [{
959 "key": "fakeobj1",
960 "put_time": generate_put_time(),
961 "hash": "fakeobj1hash",
962 "fsize": 1usize,
963 "mime_type": "text/plain",
964 }, {
965 "key": "fakeobj2",
966 "put_time": generate_put_time(),
967 "hash": "fakeobj2hash",
968 "fsize": 2usize,
969 "mime_type": "text/plain",
970 }]
971 }))
972 .unwrap(),
973 )
974 }
975 1 => {
976 assert!(request
977 .url()
978 .to_string()
979 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
980 SyncResponseBody::from_bytes(
981 json_to_vec(&json!({
982 "marker": "",
983 "items": [{
984 "key": "fakeobj3",
985 "put_time": generate_put_time(),
986 "hash": "fakeobj3hash",
987 "fsize": 3usize,
988 "mime_type": "text/plain",
989 }, {
990 "key": "fakeobj4",
991 "put_time": generate_put_time(),
992 "hash": "fakeobj4hash",
993 "fsize": 4usize,
994 "mime_type": "text/plain",
995 }]
996 }))
997 .unwrap(),
998 )
999 }
1000 _ => unreachable!(),
1001 };
1002 Ok(SyncResponse::builder()
1003 .status_code(StatusCode::OK)
1004 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1005 .body(body)
1006 .build())
1007 }
1008
1009 #[cfg(feature = "async")]
1010 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1011 unreachable!()
1012 }
1013 }
1014
1015 let mut counter = 0usize;
1016 let bucket = get_bucket(FakeHttpCaller::default());
1017 let mut iter = bucket.list().version(ListVersion::V1).iter();
1018 for (i, entry) in (&mut iter).enumerate() {
1019 counter += 1;
1020 let entry = entry?;
1021 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1022 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1023 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1024 }
1025 assert_eq!(iter.marker(), Some(""));
1026 assert_eq!(counter, 4usize);
1027
1028 Ok(())
1029 }
1030
1031 #[test]
1032 fn test_sync_list_v1_with_error() -> anyhow::Result<()> {
1033 env_logger::builder().is_test(true).try_init().ok();
1034
1035 #[derive(Debug, Default)]
1036 struct FakeHttpCaller {
1037 counter: AtomicUsize,
1038 }
1039
1040 impl HttpCaller for FakeHttpCaller {
1041 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1042 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1043 let (code, body) = match n {
1044 0 => {
1045 assert!(request
1046 .url()
1047 .to_string()
1048 .ends_with("/list?bucket=fakebucketname&limit=1000"));
1049 (
1050 StatusCode::OK,
1051 SyncResponseBody::from_bytes(
1052 json_to_vec(&json!({
1053 "marker": "fakemarker",
1054 "items": [{
1055 "key": "fakeobj1",
1056 "put_time": generate_put_time(),
1057 "hash": "fakeobj1hash",
1058 "fsize": 1usize,
1059 "mime_type": "text/plain",
1060 }, {
1061 "key": "fakeobj2",
1062 "put_time": generate_put_time(),
1063 "hash": "fakeobj2hash",
1064 "fsize": 2usize,
1065 "mime_type": "text/plain",
1066 }]
1067 }))
1068 .unwrap(),
1069 ),
1070 )
1071 }
1072 1 => {
1073 assert!(request
1074 .url()
1075 .to_string()
1076 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1077 (
1078 StatusCode::from_u16(599).unwrap(),
1079 SyncResponseBody::from_bytes(
1080 json_to_vec(&json!({
1081 "error": "Test Error"
1082 }))
1083 .unwrap(),
1084 ),
1085 )
1086 }
1087 _ => unreachable!(),
1088 };
1089 Ok(SyncResponse::builder()
1090 .status_code(code)
1091 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1092 .body(body)
1093 .build())
1094 }
1095
1096 #[cfg(feature = "async")]
1097 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1098 unreachable!()
1099 }
1100 }
1101
1102 let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1103 let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1104 let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1105 let bucket = get_bucket(FakeHttpCaller::default());
1106 let mut iter = bucket
1107 .list()
1108 .version(ListVersion::V1)
1109 .before_request_callback({
1110 let before_request_callback_counter = before_request_callback_counter.to_owned();
1111 move |_| {
1112 before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1113 Ok(())
1114 }
1115 })
1116 .after_response_ok_callback({
1117 let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1118 move |_| {
1119 after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1120 Ok(())
1121 }
1122 })
1123 .after_response_error_callback({
1124 let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1125 move |_| {
1126 after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1127 Ok(())
1128 }
1129 })
1130 .iter();
1131 let mut entry = iter.next().unwrap()?;
1132 assert_eq!(entry.get_key_as_str(), "fakeobj1");
1133 assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1134 assert_eq!(entry.get_size_as_u64(), 1u64);
1135
1136 entry = iter.next().unwrap()?;
1137 assert_eq!(entry.get_key_as_str(), "fakeobj2");
1138 assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1139 assert_eq!(entry.get_size_as_u64(), 2u64);
1140
1141 let err = iter.next().unwrap().unwrap_err();
1142 assert_eq!(
1143 err.kind(),
1144 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1145 );
1146 assert!(iter.next().is_none());
1147 assert_eq!(iter.marker(), Some("fakemarker"));
1148 assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1149 assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1150 assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1151
1152 Ok(())
1153 }
1154
1155 #[test]
1156 fn test_sync_list_v1_with_prefix_and_limitation() -> anyhow::Result<()> {
1157 env_logger::builder().is_test(true).try_init().ok();
1158
1159 #[derive(Debug, Default)]
1160 struct FakeHttpCaller {
1161 counter: AtomicUsize,
1162 }
1163
1164 impl HttpCaller for FakeHttpCaller {
1165 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1166 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1167 let body = match n {
1168 0 => {
1169 assert!(request
1170 .url()
1171 .to_string()
1172 .ends_with("/list?bucket=fakebucketname&limit=3&prefix=fakeobj"));
1173 SyncResponseBody::from_bytes(
1174 json_to_vec(&json!({
1175 "marker": "fakemarker",
1176 "items": [{
1177 "key": "fakeobj1",
1178 "put_time": generate_put_time(),
1179 "hash": "fakeobj1hash",
1180 "fsize": 1usize,
1181 "mime_type": "text/plain",
1182 }, {
1183 "key": "fakeobj2",
1184 "put_time": generate_put_time(),
1185 "hash": "fakeobj2hash",
1186 "fsize": 2usize,
1187 "mime_type": "text/plain",
1188 }]
1189 }))
1190 .unwrap(),
1191 )
1192 }
1193 1 => {
1194 assert!(request
1195 .url()
1196 .to_string()
1197 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1&prefix=fakeobj"));
1198 SyncResponseBody::from_bytes(
1199 json_to_vec(&json!({
1200 "marker": "",
1201 "items": [{
1202 "key": "fakeobj3",
1203 "put_time": generate_put_time(),
1204 "hash": "fakeobj3hash",
1205 "fsize": 3usize,
1206 "mime_type": "text/plain",
1207 }]
1208 }))
1209 .unwrap(),
1210 )
1211 }
1212 _ => unreachable!(),
1213 };
1214 Ok(SyncResponse::builder()
1215 .status_code(StatusCode::OK)
1216 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1217 .body(body)
1218 .build())
1219 }
1220
1221 #[cfg(feature = "async")]
1222 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1223 unreachable!()
1224 }
1225 }
1226
1227 let mut counter = 0usize;
1228 let bucket = get_bucket(FakeHttpCaller::default());
1229 let mut iter = bucket.list().version(ListVersion::V1).prefix("fakeobj").limit(3).iter();
1230 for (i, entry) in (&mut iter).enumerate() {
1231 counter += 1;
1232 let entry = entry?;
1233 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1234 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1235 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1236 }
1237 assert_eq!(iter.marker(), Some(""));
1238 assert_eq!(counter, 3usize);
1239
1240 Ok(())
1241 }
1242
1243 #[test]
1244 fn test_sync_list_v1_with_cancellation() -> anyhow::Result<()> {
1245 env_logger::builder().is_test(true).try_init().ok();
1246
1247 #[derive(Debug, Default)]
1248 struct FakeHttpCaller {
1249 counter: AtomicUsize,
1250 }
1251
1252 impl HttpCaller for FakeHttpCaller {
1253 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1254 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1255 let body = match n {
1256 0 => {
1257 assert!(request
1258 .url()
1259 .to_string()
1260 .ends_with("/list?bucket=fakebucketname&limit=1000"));
1261 SyncResponseBody::from_bytes(
1262 json_to_vec(&json!({
1263 "marker": "fakemarker",
1264 "items": [{
1265 "key": "fakeobj1",
1266 "put_time": generate_put_time(),
1267 "hash": "fakeobj1hash",
1268 "fsize": 1usize,
1269 "mime_type": "text/plain",
1270 }, {
1271 "key": "fakeobj2",
1272 "put_time": generate_put_time(),
1273 "hash": "fakeobj2hash",
1274 "fsize": 2usize,
1275 "mime_type": "text/plain",
1276 }]
1277 }))
1278 .unwrap(),
1279 )
1280 }
1281 _ => unreachable!(),
1282 };
1283 Ok(SyncResponse::builder()
1284 .status_code(StatusCode::OK)
1285 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1286 .body(body)
1287 .build())
1288 }
1289
1290 #[cfg(feature = "async")]
1291 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1292 unreachable!()
1293 }
1294 }
1295
1296 let counter = Arc::new(AtomicUsize::new(0));
1297 let bucket = get_bucket(FakeHttpCaller::default());
1298 {
1299 let mut iter = bucket
1300 .list()
1301 .version(ListVersion::V1)
1302 .before_request_callback({
1303 let counter = counter.to_owned();
1304 move |_| {
1305 if counter.load(Ordering::Relaxed) > 0 {
1306 Err(AnyError::msg("Fake error"))
1307 } else {
1308 Ok(())
1309 }
1310 }
1311 })
1312 .iter();
1313 for (i, entry) in (&mut iter).enumerate() {
1314 if counter.fetch_add(1, Ordering::Relaxed) < 2 {
1315 let entry = entry?;
1316 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1317 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1318 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1319 } else {
1320 let err = entry.unwrap_err();
1321 assert!(matches!(
1322 err.kind(),
1323 ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
1324 ));
1325 break;
1326 }
1327 }
1328 assert_eq!(iter.marker(), Some("fakemarker"));
1329 }
1330 assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
1331
1332 Ok(())
1333 }
1334
1335 #[test]
1336 fn test_sync_list_v2() -> anyhow::Result<()> {
1337 env_logger::builder().is_test(true).try_init().ok();
1338
1339 #[derive(Debug, Default)]
1340 struct FakeHttpCaller {
1341 counter: AtomicUsize,
1342 }
1343
1344 impl HttpCaller for FakeHttpCaller {
1345 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1346 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1347 let body = match n {
1348 0 => {
1349 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1350 SyncResponseBody::from_bytes(
1351 [
1352 json_to_string(&json!({
1353 "item": {
1354 "key": "fakeobj1",
1355 "put_time": generate_put_time(),
1356 "hash": "fakeobj1hash",
1357 "fsize": 1usize,
1358 "mime_type": "text/plain",
1359 },
1360 "marker": "fakemarkerobj1",
1361 }))
1362 .unwrap(),
1363 json_to_string(&json!({
1364 "item": {
1365 "key": "fakeobj2",
1366 "put_time": generate_put_time(),
1367 "hash": "fakeobj2hash",
1368 "fsize": 2usize,
1369 "mime_type": "text/plain",
1370 },
1371 "marker": "fakemarkerobj2",
1372 }))
1373 .unwrap(),
1374 ]
1375 .join("\n")
1376 .as_bytes()
1377 .to_owned(),
1378 )
1379 }
1380 1 => {
1381 assert!(request
1382 .url()
1383 .to_string()
1384 .ends_with("/list?bucket=fakebucketname&marker=fakemarkerobj2"));
1385 SyncResponseBody::from_bytes(
1386 [
1387 json_to_string(&json!({
1388 "item": {
1389 "key": "fakeobj3",
1390 "put_time": generate_put_time(),
1391 "hash": "fakeobj3hash",
1392 "fsize": 3usize,
1393 "mime_type": "text/plain",
1394 },
1395 "marker": "fakemarkerobj3",
1396 }))
1397 .unwrap(),
1398 json_to_string(&json!({
1399 "item": {
1400 "key": "fakeobj4",
1401 "put_time": generate_put_time(),
1402 "hash": "fakeobj4hash",
1403 "fsize": 4usize,
1404 "mime_type": "text/plain",
1405 },
1406 "marker": "",
1407 }))
1408 .unwrap(),
1409 ]
1410 .join("\n")
1411 .as_bytes()
1412 .to_owned(),
1413 )
1414 }
1415 _ => unreachable!(),
1416 };
1417 Ok(SyncResponse::builder()
1418 .status_code(StatusCode::OK)
1419 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1420 .body(body)
1421 .build())
1422 }
1423
1424 #[cfg(feature = "async")]
1425 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1426 unreachable!()
1427 }
1428 }
1429
1430 let mut counter = 0usize;
1431 let bucket = get_bucket(FakeHttpCaller::default());
1432 let mut iter = bucket.list().version(ListVersion::V2).iter();
1433 for (i, entry) in (&mut iter).enumerate() {
1434 counter += 1;
1435 let entry = entry?;
1436 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1437 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1438 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1439 }
1440 assert_eq!(counter, 4usize);
1441 assert_eq!(iter.marker(), Some(""));
1442
1443 Ok(())
1444 }
1445
1446 #[test]
1447 fn test_sync_list_v2_with_non_results() -> anyhow::Result<()> {
1448 env_logger::builder().is_test(true).try_init().ok();
1449
1450 #[derive(Debug, Default)]
1451 struct FakeHttpCaller {
1452 counter: AtomicUsize,
1453 }
1454
1455 impl HttpCaller for FakeHttpCaller {
1456 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1457 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1458 let body = match n {
1459 0 => {
1460 assert!(request
1461 .url()
1462 .to_string()
1463 .ends_with("/v2/list?bucket=fakebucketname&prefix=non-existed"));
1464 SyncResponseBody::from_bytes(Vec::new())
1465 }
1466 _ => unreachable!(),
1467 };
1468 Ok(SyncResponse::builder()
1469 .status_code(StatusCode::OK)
1470 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1471 .body(body)
1472 .build())
1473 }
1474
1475 #[cfg(feature = "async")]
1476 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1477 unreachable!()
1478 }
1479 }
1480
1481 let mut counter = 0usize;
1482 let bucket = get_bucket(FakeHttpCaller::default());
1483 let mut iter = bucket.list().version(ListVersion::V2).prefix("non-existed").iter();
1484 for _entry in &mut iter {
1485 counter += 1;
1486 }
1487 assert_eq!(counter, 0usize);
1488 assert_eq!(iter.marker(), None);
1489
1490 Ok(())
1491 }
1492
1493 #[test]
1494 fn test_sync_list_v2_with_error() -> anyhow::Result<()> {
1495 env_logger::builder().is_test(true).try_init().ok();
1496
1497 #[derive(Debug, Default)]
1498 struct FakeHttpCaller {
1499 counter: AtomicUsize,
1500 }
1501
1502 impl HttpCaller for FakeHttpCaller {
1503 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1504 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1505 let (code, body) = match n {
1506 0 => {
1507 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1508 (
1509 StatusCode::OK,
1510 SyncResponseBody::from_bytes(
1511 [
1512 json_to_string(&json!({
1513 "item": {
1514 "key": "fakeobj1",
1515 "put_time": generate_put_time(),
1516 "hash": "fakeobj1hash",
1517 "fsize": 1usize,
1518 "mime_type": "text/plain",
1519 },
1520 "marker": "fakemarkerobj1",
1521 }))
1522 .unwrap(),
1523 json_to_string(&json!({
1524 "item": {
1525 "key": "fakeobj2",
1526 "put_time": generate_put_time(),
1527 "hash": "fakeobj2hash",
1528 "fsize": 2usize,
1529 "mime_type": "text/plain",
1530 },
1531 "marker": "fakemarkerobj2",
1532 }))
1533 .unwrap(),
1534 ]
1535 .join("\n")
1536 .as_bytes()
1537 .to_owned(),
1538 ),
1539 )
1540 }
1541 1 => {
1542 assert!(request
1543 .url()
1544 .to_string()
1545 .ends_with("/v2/list?bucket=fakebucketname&marker=fakemarkerobj2"));
1546 (
1547 StatusCode::from_u16(599).unwrap(),
1548 SyncResponseBody::from_bytes(
1549 json_to_vec(&json!({
1550 "error": "Test Error"
1551 }))
1552 .unwrap(),
1553 ),
1554 )
1555 }
1556 _ => unreachable!(),
1557 };
1558 Ok(SyncResponse::builder()
1559 .status_code(code)
1560 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1561 .body(body)
1562 .build())
1563 }
1564
1565 #[cfg(feature = "async")]
1566 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1567 unreachable!()
1568 }
1569 }
1570
1571 let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1572 let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1573 let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1574 let bucket = get_bucket(FakeHttpCaller::default());
1575 let mut iter = bucket
1576 .list()
1577 .version(ListVersion::V2)
1578 .before_request_callback({
1579 let before_request_callback_counter = before_request_callback_counter.to_owned();
1580 move |_| {
1581 before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1582 Ok(())
1583 }
1584 })
1585 .after_response_ok_callback({
1586 let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1587 move |_| {
1588 after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1589 Ok(())
1590 }
1591 })
1592 .after_response_error_callback({
1593 let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1594 move |_| {
1595 after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1596 Ok(())
1597 }
1598 })
1599 .iter();
1600 let mut entry = iter.next().unwrap()?;
1601 assert_eq!(entry.get_key_as_str(), "fakeobj1");
1602 assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1603 assert_eq!(entry.get_size_as_u64(), 1u64);
1604
1605 entry = iter.next().unwrap()?;
1606 assert_eq!(entry.get_key_as_str(), "fakeobj2");
1607 assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1608 assert_eq!(entry.get_size_as_u64(), 2u64);
1609
1610 let err = iter.next().unwrap().unwrap_err();
1611 assert_eq!(
1612 err.kind(),
1613 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1614 );
1615 assert!(iter.next().is_none());
1616 assert_eq!(iter.marker(), Some("fakemarkerobj2"));
1617 assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1618 assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1619 assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1620
1621 Ok(())
1622 }
1623
1624 #[test]
1625 fn test_sync_list_v2_with_cancellation() -> anyhow::Result<()> {
1626 env_logger::builder().is_test(true).try_init().ok();
1627
1628 #[derive(Debug, Default)]
1629 struct FakeHttpCaller {
1630 counter: AtomicUsize,
1631 }
1632
1633 impl HttpCaller for FakeHttpCaller {
1634 fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1635 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1636 let body = match n {
1637 0 => {
1638 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1639 SyncResponseBody::from_bytes(
1640 [
1641 json_to_string(&json!({
1642 "item": {
1643 "key": "fakeobj1",
1644 "put_time": generate_put_time(),
1645 "hash": "fakeobj1hash",
1646 "fsize": 1usize,
1647 "mime_type": "text/plain",
1648 },
1649 "marker": "fakemarkerobj1",
1650 }))
1651 .unwrap(),
1652 json_to_string(&json!({
1653 "item": {
1654 "key": "fakeobj2",
1655 "put_time": generate_put_time(),
1656 "hash": "fakeobj2hash",
1657 "fsize": 2usize,
1658 "mime_type": "text/plain",
1659 },
1660 "marker": "fakemarkerobj2",
1661 }))
1662 .unwrap(),
1663 ]
1664 .join("\n")
1665 .as_bytes()
1666 .to_owned(),
1667 )
1668 }
1669 _ => unreachable!(),
1670 };
1671 Ok(SyncResponse::builder()
1672 .status_code(StatusCode::OK)
1673 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1674 .body(body)
1675 .build())
1676 }
1677
1678 #[cfg(feature = "async")]
1679 fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1680 unreachable!()
1681 }
1682 }
1683
1684 let counter = Arc::new(AtomicUsize::new(0));
1685 let bucket = get_bucket(FakeHttpCaller::default());
1686 {
1687 let mut iter = bucket
1688 .list()
1689 .version(ListVersion::V2)
1690 .before_request_callback({
1691 let counter = counter.to_owned();
1692 move |_| {
1693 if counter.load(Ordering::Relaxed) > 0 {
1694 Err(AnyError::msg("Fake error"))
1695 } else {
1696 Ok(())
1697 }
1698 }
1699 })
1700 .iter();
1701 for (i, entry) in (&mut iter).enumerate() {
1702 if counter.fetch_add(1, Ordering::Relaxed) < 2 {
1703 let entry = entry?;
1704 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1705 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1706 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1707 } else {
1708 let err = entry.unwrap_err();
1709 assert!(matches!(
1710 err.kind(),
1711 ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
1712 ));
1713 break;
1714 }
1715 }
1716 assert_eq!(iter.marker(), Some("fakemarkerobj2"));
1717 }
1718 assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
1719
1720 Ok(())
1721 }
1722
1723 #[async_std::test]
1724 #[cfg(feature = "async")]
1725 async fn test_async_list_v1() -> anyhow::Result<()> {
1726 env_logger::builder().is_test(true).try_init().ok();
1727
1728 #[derive(Debug, Default)]
1729 struct FakeHttpCaller {
1730 counter: AtomicUsize,
1731 }
1732
1733 impl HttpCaller for FakeHttpCaller {
1734 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1735 unreachable!()
1736 }
1737
1738 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1739 Box::pin(async move {
1740 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1741 let body = match n {
1742 0 => {
1743 assert!(request
1744 .url()
1745 .to_string()
1746 .ends_with("/list?bucket=fakebucketname&limit=1000"));
1747 AsyncResponseBody::from_bytes(
1748 json_to_vec(&json!({
1749 "marker": "fakemarker",
1750 "items": [{
1751 "key": "fakeobj1",
1752 "put_time": generate_put_time(),
1753 "hash": "fakeobj1hash",
1754 "fsize": 1usize,
1755 "mime_type": "text/plain",
1756 }, {
1757 "key": "fakeobj2",
1758 "put_time": generate_put_time(),
1759 "hash": "fakeobj2hash",
1760 "fsize": 2usize,
1761 "mime_type": "text/plain",
1762 }]
1763 }))
1764 .unwrap(),
1765 )
1766 }
1767 1 => {
1768 assert!(request
1769 .url()
1770 .to_string()
1771 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1772 AsyncResponseBody::from_bytes(
1773 json_to_vec(&json!({
1774 "marker": "",
1775 "items": [{
1776 "key": "fakeobj3",
1777 "put_time": generate_put_time(),
1778 "hash": "fakeobj3hash",
1779 "fsize": 3usize,
1780 "mime_type": "text/plain",
1781 }, {
1782 "key": "fakeobj4",
1783 "put_time": generate_put_time(),
1784 "hash": "fakeobj4hash",
1785 "fsize": 4usize,
1786 "mime_type": "text/plain",
1787 }]
1788 }))
1789 .unwrap(),
1790 )
1791 }
1792 _ => unreachable!(),
1793 };
1794 Ok(AsyncResponse::builder()
1795 .status_code(StatusCode::OK)
1796 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1797 .body(body)
1798 .build())
1799 })
1800 }
1801 }
1802
1803 let mut counter = 0usize;
1804 let bucket = get_bucket(FakeHttpCaller::default());
1805 let mut stream = bucket.list().version(ListVersion::V1).stream();
1806 let mut iter = (&mut stream).enumerate();
1807 while let Some((i, entry)) = iter.next().await {
1808 counter += 1;
1809 let entry = entry?;
1810 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1811 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1812 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1813 }
1814 assert_eq!(stream.marker(), Some(""));
1815 assert_eq!(counter, 4usize);
1816
1817 Ok(())
1818 }
1819
1820 #[async_std::test]
1821 #[cfg(feature = "async")]
1822 async fn test_async_list_v1_with_error() -> anyhow::Result<()> {
1823 env_logger::builder().is_test(true).try_init().ok();
1824
1825 #[derive(Debug, Default)]
1826 struct FakeHttpCaller {
1827 counter: AtomicUsize,
1828 }
1829
1830 impl HttpCaller for FakeHttpCaller {
1831 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1832 unreachable!()
1833 }
1834
1835 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1836 Box::pin(async move {
1837 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1838 let (code, body) = match n {
1839 0 => {
1840 assert!(request
1841 .url()
1842 .to_string()
1843 .ends_with("/list?bucket=fakebucketname&limit=1000"));
1844 (
1845 StatusCode::OK,
1846 AsyncResponseBody::from_bytes(
1847 json_to_vec(&json!({
1848 "marker": "fakemarker",
1849 "items": [{
1850 "key": "fakeobj1",
1851 "put_time": generate_put_time(),
1852 "hash": "fakeobj1hash",
1853 "fsize": 1usize,
1854 "mime_type": "text/plain",
1855 }, {
1856 "key": "fakeobj2",
1857 "put_time": generate_put_time(),
1858 "hash": "fakeobj2hash",
1859 "fsize": 2usize,
1860 "mime_type": "text/plain",
1861 }]
1862 }))
1863 .unwrap(),
1864 ),
1865 )
1866 }
1867 1 => {
1868 assert!(request
1869 .url()
1870 .to_string()
1871 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1872 (
1873 StatusCode::from_u16(599).unwrap(),
1874 AsyncResponseBody::from_bytes(
1875 json_to_vec(&json!({
1876 "error": "Test Error"
1877 }))
1878 .unwrap(),
1879 ),
1880 )
1881 }
1882 _ => unreachable!(),
1883 };
1884 Ok(AsyncResponse::builder()
1885 .status_code(code)
1886 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1887 .body(body)
1888 .build())
1889 })
1890 }
1891 }
1892
1893 let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1894 let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1895 let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1896 let bucket = get_bucket(FakeHttpCaller::default());
1897 let mut iter = bucket
1898 .list()
1899 .version(ListVersion::V1)
1900 .before_request_callback({
1901 let before_request_callback_counter = before_request_callback_counter.to_owned();
1902 move |_| {
1903 before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1904 Ok(())
1905 }
1906 })
1907 .after_response_ok_callback({
1908 let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1909 move |_| {
1910 after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1911 Ok(())
1912 }
1913 })
1914 .after_response_error_callback({
1915 let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1916 move |_| {
1917 after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1918 Ok(())
1919 }
1920 })
1921 .stream();
1922 let mut entry = iter.try_next().await?.unwrap();
1923 assert_eq!(entry.get_key_as_str(), "fakeobj1");
1924 assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1925 assert_eq!(entry.get_size_as_u64(), 1u64);
1926
1927 entry = iter.try_next().await?.unwrap();
1928 assert_eq!(entry.get_key_as_str(), "fakeobj2");
1929 assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1930 assert_eq!(entry.get_size_as_u64(), 2u64);
1931
1932 let err = iter.try_next().await.unwrap_err();
1933 assert_eq!(
1934 err.kind(),
1935 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1936 );
1937 assert!(iter.try_next().await?.is_none());
1938 assert_eq!(iter.marker(), Some("fakemarker"));
1939 assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1940 assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1941 assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1942
1943 Ok(())
1944 }
1945
1946 #[async_std::test]
1947 #[cfg(feature = "async")]
1948 async fn test_async_list_v1_with_prefix_and_limitation() -> anyhow::Result<()> {
1949 env_logger::builder().is_test(true).try_init().ok();
1950
1951 #[derive(Debug, Default)]
1952 struct FakeHttpCaller {
1953 counter: AtomicUsize,
1954 }
1955
1956 impl HttpCaller for FakeHttpCaller {
1957 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1958 unreachable!()
1959 }
1960
1961 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1962 Box::pin(async move {
1963 let n = self.counter.fetch_add(1, Ordering::SeqCst);
1964 let body = match n {
1965 0 => {
1966 assert!(request
1967 .url()
1968 .to_string()
1969 .ends_with("/list?bucket=fakebucketname&limit=3&prefix=fakeobj"));
1970 AsyncResponseBody::from_bytes(
1971 json_to_vec(&json!({
1972 "marker": "fakemarker",
1973 "items": [{
1974 "key": "fakeobj1",
1975 "put_time": generate_put_time(),
1976 "hash": "fakeobj1hash",
1977 "fsize": 1usize,
1978 "mime_type": "text/plain",
1979 }, {
1980 "key": "fakeobj2",
1981 "put_time": generate_put_time(),
1982 "hash": "fakeobj2hash",
1983 "fsize": 2usize,
1984 "mime_type": "text/plain",
1985 }]
1986 }))
1987 .unwrap(),
1988 )
1989 }
1990 1 => {
1991 assert!(request
1992 .url()
1993 .to_string()
1994 .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1&prefix=fakeobj"));
1995 AsyncResponseBody::from_bytes(
1996 json_to_vec(&json!({
1997 "marker": "",
1998 "items": [{
1999 "key": "fakeobj3",
2000 "put_time": generate_put_time(),
2001 "hash": "fakeobj3hash",
2002 "fsize": 3usize,
2003 "mime_type": "text/plain",
2004 }]
2005 }))
2006 .unwrap(),
2007 )
2008 }
2009 _ => unreachable!(),
2010 };
2011 Ok(AsyncResponse::builder()
2012 .status_code(StatusCode::OK)
2013 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2014 .body(body)
2015 .build())
2016 })
2017 }
2018 }
2019
2020 let mut counter = 0usize;
2021 let bucket = get_bucket(FakeHttpCaller::default());
2022 let mut stream = bucket
2023 .list()
2024 .version(ListVersion::V1)
2025 .prefix("fakeobj")
2026 .limit(3)
2027 .stream();
2028 let mut iter = (&mut stream).enumerate();
2029 while let Some((i, entry)) = iter.next().await {
2030 counter += 1;
2031 let entry = entry?;
2032 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2033 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2034 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2035 }
2036 assert_eq!(stream.marker(), Some(""));
2037 assert_eq!(counter, 3usize);
2038
2039 Ok(())
2040 }
2041
2042 #[async_std::test]
2043 #[cfg(feature = "async")]
2044 async fn test_async_list_v1_with_cancellation() -> anyhow::Result<()> {
2045 env_logger::builder().is_test(true).try_init().ok();
2046
2047 #[derive(Debug, Default)]
2048 struct FakeHttpCaller {
2049 counter: AtomicUsize,
2050 }
2051
2052 impl HttpCaller for FakeHttpCaller {
2053 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2054 unreachable!()
2055 }
2056
2057 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2058 Box::pin(async move {
2059 let n = self.counter.fetch_add(1, Ordering::SeqCst);
2060 let body = match n {
2061 0 => {
2062 assert!(request
2063 .url()
2064 .to_string()
2065 .ends_with("/list?bucket=fakebucketname&limit=1000"));
2066 AsyncResponseBody::from_bytes(
2067 json_to_vec(&json!({
2068 "marker": "fakemarker",
2069 "items": [{
2070 "key": "fakeobj1",
2071 "put_time": generate_put_time(),
2072 "hash": "fakeobj1hash",
2073 "fsize": 1usize,
2074 "mime_type": "text/plain",
2075 }, {
2076 "key": "fakeobj2",
2077 "put_time": generate_put_time(),
2078 "hash": "fakeobj2hash",
2079 "fsize": 2usize,
2080 "mime_type": "text/plain",
2081 }]
2082 }))
2083 .unwrap(),
2084 )
2085 }
2086 _ => unreachable!(),
2087 };
2088 Ok(AsyncResponse::builder()
2089 .status_code(StatusCode::OK)
2090 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2091 .body(body)
2092 .build())
2093 })
2094 }
2095 }
2096
2097 let counter = Arc::new(AtomicUsize::new(0));
2098 {
2099 let bucket = get_bucket(FakeHttpCaller::default());
2100 let mut stream = bucket
2101 .list()
2102 .version(ListVersion::V1)
2103 .before_request_callback({
2104 let counter = counter.to_owned();
2105 move |_| {
2106 if counter.load(Ordering::Relaxed) > 0 {
2107 Err(AnyError::msg("Fake error"))
2108 } else {
2109 Ok(())
2110 }
2111 }
2112 })
2113 .stream();
2114 let mut iter = (&mut stream).enumerate();
2115 while let Some((i, entry)) = iter.next().await {
2116 if counter.fetch_add(1, Ordering::Relaxed) < 2 {
2117 let entry = entry?;
2118 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2119 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2120 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2121 } else {
2122 let err = entry.unwrap_err();
2123 assert!(matches!(
2124 err.kind(),
2125 ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
2126 ));
2127 break;
2128 }
2129 }
2130 assert_eq!(stream.marker(), Some("fakemarker"));
2131 }
2132 assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
2133
2134 Ok(())
2135 }
2136
2137 #[async_std::test]
2138 #[cfg(feature = "async")]
2139 async fn test_async_list_v2() -> anyhow::Result<()> {
2140 env_logger::builder().is_test(true).try_init().ok();
2141
2142 #[derive(Debug, Default)]
2143 struct FakeHttpCaller {
2144 counter: AtomicUsize,
2145 }
2146
2147 impl HttpCaller for FakeHttpCaller {
2148 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2149 unreachable!()
2150 }
2151
2152 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2153 Box::pin(async move {
2154 let n = self.counter.fetch_add(1, Ordering::SeqCst);
2155 let body = match n {
2156 0 => {
2157 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2158 AsyncResponseBody::from_bytes(
2159 [
2160 json_to_string(&json!({
2161 "item": {
2162 "key": "fakeobj1",
2163 "put_time": generate_put_time(),
2164 "hash": "fakeobj1hash",
2165 "fsize": 1usize,
2166 "mime_type": "text/plain",
2167 },
2168 "marker": "fakemarkerobj1",
2169 }))
2170 .unwrap(),
2171 json_to_string(&json!({
2172 "item": {
2173 "key": "fakeobj2",
2174 "put_time": generate_put_time(),
2175 "hash": "fakeobj2hash",
2176 "fsize": 2usize,
2177 "mime_type": "text/plain",
2178 },
2179 "marker": "fakemarkerobj2",
2180 }))
2181 .unwrap(),
2182 ]
2183 .join("\n")
2184 .as_bytes()
2185 .to_owned(),
2186 )
2187 }
2188 1 => {
2189 assert!(request
2190 .url()
2191 .to_string()
2192 .ends_with("/list?bucket=fakebucketname&marker=fakemarkerobj2"));
2193 AsyncResponseBody::from_bytes(
2194 [
2195 json_to_string(&json!({
2196 "item": {
2197 "key": "fakeobj3",
2198 "put_time": generate_put_time(),
2199 "hash": "fakeobj3hash",
2200 "fsize": 3usize,
2201 "mime_type": "text/plain",
2202 },
2203 "marker": "fakemarkerobj3",
2204 }))
2205 .unwrap(),
2206 json_to_string(&json!({
2207 "item": {
2208 "key": "fakeobj4",
2209 "put_time": generate_put_time(),
2210 "hash": "fakeobj4hash",
2211 "fsize": 4usize,
2212 "mime_type": "text/plain",
2213 },
2214 "marker": "",
2215 }))
2216 .unwrap(),
2217 ]
2218 .join("\n")
2219 .as_bytes()
2220 .to_owned(),
2221 )
2222 }
2223 _ => unreachable!(),
2224 };
2225 Ok(AsyncResponse::builder()
2226 .status_code(StatusCode::OK)
2227 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2228 .body(body)
2229 .build())
2230 })
2231 }
2232 }
2233
2234 let mut counter = 0usize;
2235 let bucket = get_bucket(FakeHttpCaller::default());
2236 let mut stream = bucket.list().version(ListVersion::V2).stream();
2237 let mut iter = (&mut stream).enumerate();
2238 while let Some((i, entry)) = iter.next().await {
2239 counter += 1;
2240 let entry = entry?;
2241 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2242 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2243 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2244 }
2245 assert_eq!(stream.marker(), Some(""));
2246 assert_eq!(counter, 4usize);
2247
2248 Ok(())
2249 }
2250
2251 #[async_std::test]
2252 #[cfg(feature = "async")]
2253 async fn test_async_list_v2_with_non_results() -> anyhow::Result<()> {
2254 env_logger::builder().is_test(true).try_init().ok();
2255
2256 #[derive(Debug, Default)]
2257 struct FakeHttpCaller {
2258 counter: AtomicUsize,
2259 }
2260
2261 impl HttpCaller for FakeHttpCaller {
2262 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2263 unreachable!()
2264 }
2265
2266 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2267 Box::pin(async move {
2268 let n = self.counter.fetch_add(1, Ordering::SeqCst);
2269 let body = match n {
2270 0 => {
2271 assert!(request
2272 .url()
2273 .to_string()
2274 .ends_with("/v2/list?bucket=fakebucketname&prefix=non-exist"));
2275 AsyncResponseBody::from_bytes(Vec::new())
2276 }
2277 _ => unreachable!(),
2278 };
2279 Ok(AsyncResponse::builder()
2280 .status_code(StatusCode::OK)
2281 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2282 .body(body)
2283 .build())
2284 })
2285 }
2286 }
2287
2288 let mut counter = 0usize;
2289 let bucket = get_bucket(FakeHttpCaller::default());
2290 let mut stream = bucket.list().version(ListVersion::V2).prefix("non-exist").stream();
2291 let mut iter = (&mut stream).enumerate();
2292 while let Some((_i, _entry)) = iter.next().await {
2293 counter += 1;
2294 }
2295 assert_eq!(stream.marker(), None);
2296 assert_eq!(counter, 0usize);
2297
2298 Ok(())
2299 }
2300
2301 #[async_std::test]
2302 #[cfg(feature = "async")]
2303 async fn test_async_list_v2_with_error() -> anyhow::Result<()> {
2304 env_logger::builder().is_test(true).try_init().ok();
2305
2306 #[derive(Debug, Default)]
2307 struct FakeHttpCaller {
2308 counter: AtomicUsize,
2309 }
2310
2311 impl HttpCaller for FakeHttpCaller {
2312 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2313 unreachable!()
2314 }
2315
2316 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2317 Box::pin(async move {
2318 let n = self.counter.fetch_add(1, Ordering::SeqCst);
2319 let (code, body) = match n {
2320 0 => {
2321 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2322 (
2323 StatusCode::OK,
2324 AsyncResponseBody::from_bytes(
2325 [
2326 json_to_string(&json!({
2327 "item": {
2328 "key": "fakeobj1",
2329 "put_time": generate_put_time(),
2330 "hash": "fakeobj1hash",
2331 "fsize": 1usize,
2332 "mime_type": "text/plain",
2333 },
2334 "marker": "fakemarkerobj1",
2335 }))
2336 .unwrap(),
2337 json_to_string(&json!({
2338 "item": {
2339 "key": "fakeobj2",
2340 "put_time": generate_put_time(),
2341 "hash": "fakeobj2hash",
2342 "fsize": 2usize,
2343 "mime_type": "text/plain",
2344 },
2345 "marker": "fakemarkerobj2",
2346 }))
2347 .unwrap(),
2348 ]
2349 .join("\n")
2350 .as_bytes()
2351 .to_owned(),
2352 ),
2353 )
2354 }
2355 1 => {
2356 assert!(request
2357 .url()
2358 .to_string()
2359 .ends_with("/v2/list?bucket=fakebucketname&marker=fakemarkerobj2"));
2360 (
2361 StatusCode::from_u16(599).unwrap(),
2362 AsyncResponseBody::from_bytes(
2363 json_to_vec(&json!({
2364 "error": "Test Error"
2365 }))
2366 .unwrap(),
2367 ),
2368 )
2369 }
2370 _ => unreachable!(),
2371 };
2372 Ok(AsyncResponse::builder()
2373 .status_code(code)
2374 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2375 .body(body)
2376 .build())
2377 })
2378 }
2379 }
2380
2381 let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
2382 let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
2383 let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
2384 let bucket = get_bucket(FakeHttpCaller::default());
2385 let mut stream = bucket
2386 .list()
2387 .version(ListVersion::V2)
2388 .before_request_callback({
2389 let before_request_callback_counter = before_request_callback_counter.to_owned();
2390 move |_| {
2391 before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
2392 Ok(())
2393 }
2394 })
2395 .after_response_ok_callback({
2396 let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
2397 move |_| {
2398 after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
2399 Ok(())
2400 }
2401 })
2402 .after_response_error_callback({
2403 let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
2404 move |_| {
2405 after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
2406 Ok(())
2407 }
2408 })
2409 .stream();
2410 let mut entry = stream.try_next().await?.unwrap();
2411 assert_eq!(entry.get_key_as_str(), "fakeobj1");
2412 assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
2413 assert_eq!(entry.get_size_as_u64(), 1u64);
2414
2415 entry = stream.try_next().await?.unwrap();
2416 assert_eq!(entry.get_key_as_str(), "fakeobj2");
2417 assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
2418 assert_eq!(entry.get_size_as_u64(), 2u64);
2419
2420 let err = stream.try_next().await.unwrap_err();
2421 assert_eq!(
2422 err.kind(),
2423 ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
2424 );
2425 assert!(stream.try_next().await?.is_none());
2426 assert_eq!(stream.marker(), Some("fakemarkerobj2"));
2427 assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
2428 assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
2429 assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
2430
2431 Ok(())
2432 }
2433
2434 #[async_std::test]
2435 #[cfg(feature = "async")]
2436 async fn test_async_list_v2_with_cancellation() -> anyhow::Result<()> {
2437 env_logger::builder().is_test(true).try_init().ok();
2438
2439 #[derive(Debug, Default)]
2440 struct FakeHttpCaller {
2441 counter: AtomicUsize,
2442 }
2443
2444 impl HttpCaller for FakeHttpCaller {
2445 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2446 unreachable!()
2447 }
2448
2449 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2450 Box::pin(async move {
2451 let n = self.counter.fetch_add(1, Ordering::SeqCst);
2452 let body = match n {
2453 0 => {
2454 assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2455 AsyncResponseBody::from_bytes(
2456 [
2457 json_to_string(&json!({
2458 "item": {
2459 "key": "fakeobj1",
2460 "put_time": generate_put_time(),
2461 "hash": "fakeobj1hash",
2462 "fsize": 1usize,
2463 "mime_type": "text/plain",
2464 },
2465 "marker": "fakemarkerobj1",
2466 }))
2467 .unwrap(),
2468 json_to_string(&json!({
2469 "item": {
2470 "key": "fakeobj2",
2471 "put_time": generate_put_time(),
2472 "hash": "fakeobj2hash",
2473 "fsize": 2usize,
2474 "mime_type": "text/plain",
2475 },
2476 "marker": "fakemarkerobj2",
2477 }))
2478 .unwrap(),
2479 ]
2480 .join("\n")
2481 .as_bytes()
2482 .to_owned(),
2483 )
2484 }
2485 _ => unreachable!(),
2486 };
2487 Ok(AsyncResponse::builder()
2488 .status_code(StatusCode::OK)
2489 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2490 .body(body)
2491 .build())
2492 })
2493 }
2494 }
2495
2496 let counter = Arc::new(AtomicUsize::new(0));
2497 {
2498 let bucket = get_bucket(FakeHttpCaller::default());
2499 let mut stream = bucket
2500 .list()
2501 .version(ListVersion::V2)
2502 .before_request_callback({
2503 let counter = counter.to_owned();
2504 move |_| {
2505 if counter.load(Ordering::Relaxed) > 0 {
2506 Err(AnyError::msg("Fake error"))
2507 } else {
2508 Ok(())
2509 }
2510 }
2511 })
2512 .stream();
2513 let mut iter = (&mut stream).enumerate();
2514 while let Some((i, entry)) = iter.next().await {
2515 if counter.fetch_add(1, Ordering::Relaxed) < 2 {
2516 let entry = entry?;
2517 assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2518 assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2519 assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2520 } else {
2521 let err = entry.unwrap_err();
2522 assert!(matches!(
2523 err.kind(),
2524 ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
2525 ));
2526 break;
2527 }
2528 }
2529 assert_eq!(stream.marker(), Some("fakemarkerobj2"));
2530 }
2531 assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
2532
2533 Ok(())
2534 }
2535
2536 fn get_bucket(caller: impl HttpCaller + 'static) -> Bucket {
2537 let object_manager = ObjectsManager::builder(get_credential())
2538 .http_client(
2539 HttpClient::builder(caller)
2540 .chooser(DirectChooser)
2541 .request_retrier(NeverRetrier)
2542 .backoff(NO_BACKOFF)
2543 .build(),
2544 )
2545 .build();
2546 object_manager.bucket_with_region(get_bucket_name(), single_rsf_domain_region())
2547 }
2548
2549 fn get_credential() -> Credential {
2550 Credential::new("fakeaccesskey", "fakesecretkey")
2551 }
2552
2553 fn get_bucket_name() -> BucketName {
2554 "fakebucketname".into()
2555 }
2556
2557 fn generate_put_time() -> u64 {
2558 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64 / 100
2559 }
2560
2561 fn single_rsf_domain_region() -> Region {
2562 Region::builder("chaotic")
2563 .add_rsf_preferred_endpoint(("fakersf.example.com".to_owned(), 8080).into())
2564 .build()
2565 }
2566}