qiniu_objects_manager/
list.rs

1use super::{callbacks::Callbacks, Bucket};
2use anyhow::Error as AnyError;
3use assert_impl::assert_impl;
4use log::warn;
5use qiniu_apis::{
6    http::ResponseErrorKind as HttpResponseErrorKind,
7    http_client::{ApiResult, RegionsProvider, RegionsProviderEndpoints, Response, ResponseError, SyncResponseBody},
8    storage::get_objects::{
9        ListedObjectEntry, QueryParams, ResponseBody as GetObjectsV1ResponseBody,
10        SyncRequestBuilder as GetObjectsV1SyncRequestBuilder,
11    },
12    storage::get_objects_v2::SyncRequestBuilder as GetObjectsV2SyncRequestBuilder,
13};
14use serde::{Deserialize, Serialize};
15use std::{
16    borrow::Cow,
17    collections::VecDeque,
18    fmt::{self, Debug},
19    io::{BufRead, BufReader, Lines},
20};
21use tap::prelude::*;
22
23#[cfg(feature = "async")]
24use {futures::io::BufReader as AsyncBufReader, qiniu_apis::http_client::AsyncResponseBody};
25
26type RefRegionProviderEndpoints<'a> = RegionsProviderEndpoints<&'a dyn RegionsProvider>;
27
28#[derive(Debug, Clone)]
29struct ListParams<'a> {
30    bucket: &'a Bucket,
31    prefix: Option<Cow<'a, str>>,
32    limit: Limit,
33    marker: Marker<'a>,
34    need_parts: bool,
35}
36
37#[derive(Debug, Clone)]
38enum Marker<'a> {
39    Original(Option<Cow<'a, str>>),
40    Subsequent(Option<String>),
41}
42
43impl<'a> Marker<'a> {
44    fn new(marker: Option<Cow<'a, str>>) -> Self {
45        Self::Original(marker)
46    }
47
48    fn empty(&self) -> bool {
49        matches!(self.as_ref().map(|s| s.is_empty()), Some(true) | None)
50    }
51
52    fn as_ref(&self) -> Option<&str> {
53        match self {
54            Self::Original(marker) => marker.as_deref(),
55            Self::Subsequent(marker) => marker.as_deref(),
56        }
57    }
58
59    fn set(&mut self, marker: Option<&str>) {
60        *self = Self::Subsequent(marker.map(|s| s.to_owned()));
61    }
62
63    fn is_original(&self) -> bool {
64        matches!(self, Self::Original(..))
65    }
66}
67
68#[derive(Copy, Debug, Clone)]
69struct Limit {
70    limit: Option<usize>,
71    max: Option<usize>,
72}
73
74impl Limit {
75    fn new(limit: Option<usize>, version: ListVersion) -> Self {
76        Self {
77            limit,
78            max: version.page_limit(),
79        }
80    }
81
82    fn as_ref(&self) -> Option<usize> {
83        match (self.limit, self.max) {
84            (Some(limit), Some(max)) => Some(limit.min(max)),
85            (Some(limit), None) => Some(limit),
86            (None, Some(max)) => Some(max),
87            (None, None) => None,
88        }
89    }
90
91    fn exhausted(&self) -> bool {
92        matches!(self.limit, Some(0))
93    }
94
95    fn saturating_decrease(&mut self, sub: usize) {
96        if let Some(limit) = self.limit.as_mut() {
97            *limit = limit.saturating_sub(sub);
98        }
99    }
100}
101
102impl<'a> ListParams<'a> {
103    fn to_query_params(&self) -> QueryParams<'a> {
104        let mut query_params = QueryParams::default().set_bucket_as_str(self.bucket.name().as_str());
105        if let Some(marker) = self.marker.as_ref() {
106            query_params = query_params.set_marker_as_str(marker.to_owned());
107        }
108        if let Some(limit) = self.limit.as_ref() {
109            query_params = query_params.set_limit_as_usize(limit);
110        }
111        if let Some(prefix) = self.prefix.as_ref() {
112            query_params = query_params.set_prefix_as_str(prefix.clone());
113        }
114        if self.need_parts {
115            query_params = query_params.set_need_parts_as_bool(true);
116        }
117        query_params
118    }
119
120    fn have_done(&self) -> bool {
121        self.limit.exhausted() || !self.marker.is_original() && self.marker.empty()
122    }
123}
124
125/// 对象列举迭代器
126///
127/// 实现 [`std::iter::Iterator`] 接口,
128/// 在迭代过程中阻塞发起 API 列举对象信息。
129///
130/// 可以通过 [`crate::ListBuilder::iter`] 方法获取该迭代器。
131#[must_use]
132pub struct ListIter<'a> {
133    params: ListParams<'a>,
134    version: SyncListVersionWithStep,
135    callbacks: Callbacks<'a>,
136}
137
138impl ListIter<'_> {
139    /// 获取上一次列举返回的位置标记
140    pub fn marker(&self) -> Option<&str> {
141        self.params.marker.as_ref()
142    }
143}
144
145impl Debug for ListIter<'_> {
146    #[inline]
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        f.debug_struct("ListIter")
149            .field("params", &self.params)
150            .field("version", &self.version)
151            .finish()
152    }
153}
154
155/// 列举 API 版本
156///
157/// 目前支持 V1 和 V2,默认为 V1
158#[derive(Copy, Clone, Debug, Default)]
159#[non_exhaustive]
160pub enum ListVersion {
161    /// 列举 API V1
162    #[default]
163    V1,
164
165    /// 列举 API V2
166    V2,
167}
168
169impl ListVersion {
170    fn page_limit(self) -> Option<usize> {
171        const V1_PAGE_SIZE_MAX: usize = 1000;
172
173        match self {
174            Self::V1 => Some(V1_PAGE_SIZE_MAX),
175            Self::V2 => None,
176        }
177    }
178}
179
180#[derive(Debug)]
181enum SyncListVersionWithStep {
182    V1(SyncListV1Step),
183    V2(SyncListV2Step),
184}
185
186impl From<ListVersion> for SyncListVersionWithStep {
187    fn from(version: ListVersion) -> Self {
188        match version {
189            ListVersion::V1 => Self::V1(Default::default()),
190            ListVersion::V2 => Self::V2(Default::default()),
191        }
192    }
193}
194
195#[derive(Clone, Debug)]
196pub(super) enum SyncListV1Step {
197    Buffer {
198        buffer: VecDeque<ListedObjectEntry>,
199    },
200    Done,
201}
202
203impl Default for SyncListV1Step {
204    #[inline]
205    fn default() -> Self {
206        Self::Buffer { buffer: Default::default() }
207    }
208}
209
210#[derive(Debug, Default)]
211pub(super) enum SyncListV2Step {
212    #[default]
213    Start,
214    Lines {
215        lines: Lines<BufReader<SyncResponseBody>>,
216    },
217    Done,
218}
219
220#[derive(Clone, Debug, Serialize, Deserialize)]
221struct ListedObjectEntryV2 {
222    item: Option<ListedObjectEntry>,
223    marker: Option<String>,
224}
225
226impl<'a> ListIter<'a> {
227    pub(super) fn new(
228        bucket: &'a Bucket,
229        limit: Option<usize>,
230        prefix: Option<Cow<'a, str>>,
231        marker: Option<Cow<'a, str>>,
232        need_parts: bool,
233        version: ListVersion,
234        callbacks: Callbacks<'a>,
235    ) -> Self {
236        Self {
237            callbacks,
238            version: version.into(),
239            params: ListParams {
240                bucket,
241                prefix,
242                need_parts,
243                limit: Limit::new(limit, version),
244                marker: Marker::new(marker),
245            },
246        }
247    }
248
249    #[allow(dead_code)]
250    fn assert() {
251        assert_impl!(Send: Self);
252        // assert_impl!(Sync: Self);
253    }
254}
255
256impl Iterator for ListIter<'_> {
257    type Item = ApiResult<ListedObjectEntry>;
258
259    #[inline]
260    fn next(&mut self) -> Option<Self::Item> {
261        return match &mut self.version {
262            SyncListVersionWithStep::V1(step) => v1_next(&mut self.params, &mut self.callbacks, step),
263            SyncListVersionWithStep::V2(step) => v2_next(&mut self.params, &mut self.callbacks, step),
264        };
265
266        fn v1_next(
267            params: &mut ListParams<'_>,
268            callbacks: &mut Callbacks<'_>,
269            step: &mut SyncListV1Step,
270        ) -> Option<ApiResult<ListedObjectEntry>> {
271            match step {
272                SyncListV1Step::Buffer { buffer } => {
273                    if let Some(object) = buffer.pop_front() {
274                        Some(Ok(object))
275                    } else {
276                        match v1_next_page(params, callbacks, buffer) {
277                            Ok(true) => {
278                                *step = SyncListV1Step::Done;
279                                None
280                            }
281                            Ok(false) => buffer.pop_front().map(Ok),
282                            Err(err) => {
283                                *step = SyncListV1Step::Done;
284                                Some(Err(err))
285                            }
286                        }
287                    }
288                }
289                SyncListV1Step::Done => None,
290            }
291        }
292
293        fn v1_next_page(
294            params: &mut ListParams<'_>,
295            callbacks: &mut Callbacks<'_>,
296            buffer: &mut VecDeque<ListedObjectEntry>,
297        ) -> ApiResult<bool> {
298            let mut have_done = false;
299            if params.have_done() {
300                have_done = true;
301            } else {
302                let request = v1_make_request(params)?;
303                let response_result = v1_call_request(request, callbacks);
304                v1_handle_response(response_result?.into_body(), params, buffer);
305            }
306            Ok(have_done)
307        }
308
309        fn v1_make_request<'a>(
310            params: &mut ListParams<'a>,
311        ) -> ApiResult<GetObjectsV1SyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
312            let mut request = params
313                .bucket
314                .objects_manager()
315                .client()
316                .storage()
317                .get_objects()
318                .new_request(
319                    RegionsProviderEndpoints::new(params.bucket.region_provider()?),
320                    params.bucket.objects_manager().credential(),
321                );
322            request.query_pairs(params.to_query_params());
323            Ok(request)
324        }
325
326        fn v1_call_request(
327            mut request: GetObjectsV1SyncRequestBuilder<'_, RefRegionProviderEndpoints>,
328            callbacks: &mut Callbacks<'_>,
329        ) -> ApiResult<Response<GetObjectsV1ResponseBody>> {
330            callbacks
331                .before_request(request.parts_mut())
332                .map_err(make_callback_error)?;
333            let mut response_result = request.call();
334            callbacks
335                .after_response(&mut response_result)
336                .map_err(make_callback_error)?;
337            response_result
338        }
339
340        fn v1_handle_response(
341            body: GetObjectsV1ResponseBody,
342            params: &mut ListParams<'_>,
343            buffer: &mut VecDeque<ListedObjectEntry>,
344        ) {
345            params.marker.set(body.get_marker_as_str());
346            let listed_object_entries = body.get_items().to_listed_object_entry_vec();
347            params.limit.saturating_decrease(listed_object_entries.len());
348            *buffer = listed_object_entries.into();
349        }
350
351        fn v2_next(
352            params: &mut ListParams<'_>,
353            callbacks: &mut Callbacks<'_>,
354            step: &mut SyncListV2Step,
355        ) -> Option<ApiResult<ListedObjectEntry>> {
356            match step {
357                SyncListV2Step::Start => match v2_call(params, callbacks) {
358                    Ok(Some(mut lines)) => v2_read_entry_from_lines(params, &mut lines).tap_some(|result| {
359                        if result.is_ok() {
360                            *step = SyncListV2Step::Lines { lines };
361                        } else {
362                            *step = SyncListV2Step::Done;
363                        }
364                    }),
365                    Ok(None) => {
366                        *step = SyncListV2Step::Done;
367                        None
368                    }
369                    Err(err) => {
370                        *step = SyncListV2Step::Done;
371                        Some(Err(err))
372                    }
373                },
374                SyncListV2Step::Lines { lines } => match v2_read_entry_from_lines(params, lines) {
375                    Some(Ok(entry)) => Some(Ok(entry)),
376                    Some(Err(err)) => {
377                        warn!("Read Error from ListV2 Response Body: {}", err);
378                        *step = SyncListV2Step::Start;
379                        v2_next(params, callbacks, step)
380                    }
381                    None => {
382                        *step = SyncListV2Step::Start;
383                        v2_next(params, callbacks, step)
384                    }
385                },
386                SyncListV2Step::Done => None,
387            }
388        }
389
390        fn v2_read_entry_from_lines(
391            params: &mut ListParams<'_>,
392            lines: &mut Lines<BufReader<SyncResponseBody>>,
393        ) -> Option<ApiResult<ListedObjectEntry>> {
394            if params.limit.exhausted() {
395                return None;
396            }
397            loop {
398                match lines.next() {
399                    Some(Ok(line)) if line.is_empty() => {
400                        continue;
401                    }
402                    Some(Ok(line)) => match serde_json::from_str::<ListedObjectEntryV2>(&line) {
403                        Ok(parsed) => {
404                            params.marker.set(parsed.marker.as_deref());
405                            if let Some(item) = parsed.item {
406                                params.limit.saturating_decrease(1);
407                                return Some(Ok(item));
408                            } else {
409                                continue;
410                            }
411                        }
412                        Err(err) => {
413                            return Some(Err(err.into()));
414                        }
415                    },
416                    Some(Err(err)) => {
417                        return Some(Err(err.into()));
418                    }
419                    None => {
420                        return None;
421                    }
422                }
423            }
424        }
425
426        fn v2_call(
427            params: &mut ListParams<'_>,
428            callbacks: &mut Callbacks<'_>,
429        ) -> ApiResult<Option<Lines<BufReader<SyncResponseBody>>>> {
430            if params.have_done() {
431                return Ok(None);
432            }
433            let request = v2_make_request(params)?;
434            let response_result = v2_call_request(request, callbacks);
435            Ok(Some(BufReader::new(response_result?.into_body()).lines()))
436        }
437
438        fn v2_make_request<'a>(
439            params: &mut ListParams<'a>,
440        ) -> ApiResult<GetObjectsV2SyncRequestBuilder<'a, RefRegionProviderEndpoints<'a>>> {
441            let mut request = params
442                .bucket
443                .objects_manager()
444                .client()
445                .storage()
446                .get_objects_v2()
447                .new_request(
448                    RegionsProviderEndpoints::new(params.bucket.region_provider()?),
449                    params.bucket.objects_manager().credential(),
450                );
451            request.query_pairs(params.to_query_params());
452            Ok(request)
453        }
454
455        fn v2_call_request(
456            mut request: GetObjectsV2SyncRequestBuilder<'_, RefRegionProviderEndpoints>,
457            callbacks: &mut Callbacks<'_>,
458        ) -> ApiResult<Response<SyncResponseBody>> {
459            callbacks
460                .before_request(request.parts_mut())
461                .map_err(make_callback_error)?;
462            let mut response_result = request.call();
463            callbacks
464                .after_response(&mut response_result)
465                .map_err(make_callback_error)?;
466            response_result
467        }
468    }
469}
470
471#[cfg(feature = "async")]
472mod async_list_stream {
473    use super::*;
474    use futures::{future::BoxFuture, io::Lines as AsyncLines, ready, AsyncBufReadExt, FutureExt, Stream, StreamExt};
475    use std::{
476        fmt::{self, Debug},
477        io::Result as IOResult,
478        pin::Pin,
479        task::{Context, Poll},
480    };
481
482    enum AsyncListVersionWithStep<'a> {
483        V1(AsyncListV1Step<'a>),
484        V2(AsyncListV2Step<'a>),
485    }
486
487    impl From<ListVersion> for AsyncListVersionWithStep<'_> {
488        fn from(version: ListVersion) -> Self {
489            match version {
490                ListVersion::V1 => Self::V1(Default::default()),
491                ListVersion::V2 => Self::V2(Default::default()),
492            }
493        }
494    }
495
496    enum AsyncListV1Step<'a> {
497        FromBuffer {
498            buffer: VecDeque<ListedObjectEntry>,
499        },
500        WaitForResponse {
501            task: BoxFuture<'a, ApiResult<Response<GetObjectsV1ResponseBody>>>,
502        },
503        WaitForRegionProvider {
504            task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
505        },
506        Done,
507    }
508
509    impl Default for AsyncListV1Step<'_> {
510        #[inline]
511        fn default() -> Self {
512            Self::FromBuffer { buffer: Default::default() }
513        }
514    }
515
516    impl Debug for AsyncListV1Step<'_> {
517        #[inline]
518        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519            match self {
520                Self::FromBuffer { buffer } => f.debug_tuple("FromBuffer").field(buffer).finish(),
521                Self::WaitForResponse { .. } => f.debug_tuple("WaitForResponse").finish(),
522                Self::WaitForRegionProvider { .. } => f.debug_tuple("WaitForRegionProvider").finish(),
523                Self::Done => f.debug_tuple("Done").finish(),
524            }
525        }
526    }
527
528    trait StreamWithMarker: Stream {
529        fn marker(&self) -> Option<&str>;
530    }
531    type BoxStreamWithMarker<'a, T> = Pin<Box<dyn StreamWithMarker<Item = T> + Send + 'a>>;
532    type ListedObjectEntryResultStream<'a> = BoxStreamWithMarker<'a, ApiResult<ListedObjectEntry>>;
533
534    /// 对象列举流
535    ///
536    /// 实现 [`futures::stream::Stream`] 接口,
537    /// 在迭代过程中异步发起 API 列举对象信息
538    ///
539    /// 可以通过 [`crate::ListBuilder::stream`] 方法获取该迭代器。
540    #[must_use]
541    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
542    pub struct ListStream<'a>(ListedObjectEntryResultStream<'a>);
543
544    impl<'a> ListStream<'a> {
545        pub(in super::super) fn new(
546            bucket: &'a Bucket,
547            limit: Option<usize>,
548            prefix: Option<Cow<'a, str>>,
549            marker: Option<Cow<'a, str>>,
550            need_parts: bool,
551            version: ListVersion,
552            callbacks: Callbacks<'a>,
553        ) -> Self {
554            Self(match version {
555                ListVersion::V1 => v1_next(bucket, limit, prefix, marker, need_parts, callbacks),
556                ListVersion::V2 => v2_next(bucket, limit, prefix, marker, need_parts, callbacks),
557            })
558        }
559
560        #[allow(dead_code)]
561        fn assert() {
562            assert_impl!(Send: Self);
563            // assert_impl!(Sync: Self);
564        }
565
566        /// 获取上一次列举返回的位置标记
567        pub fn marker(&self) -> Option<&str> {
568            self.0.marker()
569        }
570    }
571
572    impl Stream for ListStream<'_> {
573        type Item = ApiResult<ListedObjectEntry>;
574
575        #[inline]
576        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577            self.0.poll_next_unpin(cx)
578        }
579    }
580
581    impl Debug for ListStream<'_> {
582        #[inline]
583        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584            f.debug_struct("ListStream").finish()
585        }
586    }
587
588    #[derive(Debug)]
589    struct ListV1Stream<'a> {
590        params: ListParams<'a>,
591        callbacks: Callbacks<'a>,
592        current_step: AsyncListV1Step<'a>,
593    }
594
595    fn v1_next<'a>(
596        bucket: &'a Bucket,
597        limit: Option<usize>,
598        prefix: Option<Cow<'a, str>>,
599        marker: Option<Cow<'a, str>>,
600        need_parts: bool,
601        callbacks: Callbacks<'a>,
602    ) -> ListedObjectEntryResultStream<'a> {
603        let params = ListParams {
604            bucket,
605            prefix,
606            need_parts,
607            limit: Limit::new(limit, ListVersion::V1),
608            marker: Marker::new(marker),
609        };
610        Box::pin(ListV1Stream {
611            params,
612            callbacks,
613            current_step: Default::default(),
614        })
615    }
616
617    impl Stream for ListV1Stream<'_> {
618        type Item = ApiResult<ListedObjectEntry>;
619
620        #[inline]
621        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622            match self.current_step {
623                AsyncListV1Step::FromBuffer { .. } => self.read_from_buffer(cx),
624                AsyncListV1Step::WaitForResponse { .. } => self.wait_for_response(cx),
625                AsyncListV1Step::WaitForRegionProvider { .. } => self.wait_for_region(cx),
626                AsyncListV1Step::Done => Poll::Ready(None),
627            }
628        }
629    }
630
631    impl StreamWithMarker for ListV1Stream<'_> {
632        fn marker(&self) -> Option<&str> {
633            self.params.marker.as_ref()
634        }
635    }
636
637    impl ListV1Stream<'_> {
638        fn read_from_buffer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
639            if let AsyncListV1Step::FromBuffer { buffer } = &mut self.current_step {
640                if let Some(object) = buffer.pop_front() {
641                    Poll::Ready(Some(Ok(object)))
642                } else {
643                    if self.params.have_done() {
644                        self.current_step = AsyncListV1Step::Done;
645                    } else {
646                        let bucket = self.params.bucket;
647                        self.current_step = AsyncListV1Step::WaitForRegionProvider {
648                            task: Box::pin(async move { bucket.async_region_provider().await }),
649                        };
650                    }
651                    self.poll_next(cx)
652                }
653            } else {
654                unreachable!()
655            }
656        }
657
658        fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
659            if let AsyncListV1Step::WaitForRegionProvider { task } = &mut self.current_step {
660                match ready!(task.poll_unpin(cx)) {
661                    Ok(region_provider) => {
662                        let credential = self.params.bucket.objects_manager().credential();
663                        let mut request = self
664                            .params
665                            .bucket
666                            .objects_manager()
667                            .client()
668                            .storage()
669                            .get_objects()
670                            .new_async_request(RegionsProviderEndpoints::new(region_provider), credential);
671                        request.query_pairs(self.params.to_query_params());
672                        if let Err(err) = self.callbacks.before_request(request.parts_mut()) {
673                            self.current_step = AsyncListV1Step::Done;
674                            Poll::Ready(Some(Err(make_callback_error(err))))
675                        } else {
676                            self.current_step = AsyncListV1Step::WaitForResponse {
677                                task: Box::pin(async move { request.call().await }),
678                            };
679                            self.poll_next(cx)
680                        }
681                    }
682                    Err(err) => {
683                        self.current_step = AsyncListV1Step::Done;
684                        Poll::Ready(Some(Err(err.into())))
685                    }
686                }
687            } else {
688                unreachable!()
689            }
690        }
691
692        fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
693            if let AsyncListV1Step::WaitForResponse { task } = &mut self.current_step {
694                let mut response_result = ready!(task.poll_unpin(cx));
695                if let Err(err) = self.callbacks.after_response(&mut response_result) {
696                    self.current_step = AsyncListV1Step::Done;
697                    Poll::Ready(Some(Err(make_callback_error(err))))
698                } else {
699                    match response_result {
700                        Ok(response) => {
701                            let body = response.into_body();
702                            let listed_object_entries = body.get_items().to_listed_object_entry_vec();
703                            self.params.marker.set(body.get_marker_as_str());
704                            self.params.limit.saturating_decrease(listed_object_entries.len());
705                            self.current_step = AsyncListV1Step::FromBuffer {
706                                buffer: listed_object_entries.into(),
707                            };
708                            self.poll_next(cx)
709                        }
710                        Err(err) => {
711                            self.current_step = AsyncListV1Step::Done;
712                            Poll::Ready(Some(Err(err)))
713                        }
714                    }
715                }
716            } else {
717                unreachable!()
718            }
719        }
720    }
721
722    #[derive(Default)]
723    enum AsyncListV2Step<'a> {
724        #[default]
725        Start,
726        WaitForRegionProvider {
727            task: BoxFuture<'a, IOResult<&'a dyn RegionsProvider>>,
728        },
729        WaitForResponse {
730            task: BoxFuture<'a, ApiResult<Response<AsyncResponseBody>>>,
731        },
732        WaitForEntries {
733            lines: AsyncLines<AsyncBufReader<AsyncResponseBody>>,
734            empty: bool,
735        },
736        Done,
737    }
738
739    struct ListV2Stream<'a> {
740        params: ListParams<'a>,
741        callbacks: Callbacks<'a>,
742        current_step: AsyncListV2Step<'a>,
743    }
744
745    #[allow(clippy::too_many_arguments)]
746    fn v2_next<'a>(
747        bucket: &'a Bucket,
748        limit: Option<usize>,
749        prefix: Option<Cow<'a, str>>,
750        marker: Option<Cow<'a, str>>,
751        need_parts: bool,
752        callbacks: Callbacks<'a>,
753    ) -> ListedObjectEntryResultStream<'a> {
754        let params = ListParams {
755            bucket,
756            prefix,
757            need_parts,
758            limit: Limit::new(limit, ListVersion::V2),
759            marker: Marker::new(marker),
760        };
761        Box::pin(ListV2Stream {
762            params,
763            callbacks,
764            current_step: Default::default(),
765        })
766    }
767
768    impl Stream for ListV2Stream<'_> {
769        type Item = ApiResult<ListedObjectEntry>;
770
771        #[inline]
772        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
773            match self.current_step {
774                AsyncListV2Step::Start { .. } => self.start(cx),
775                AsyncListV2Step::WaitForResponse { .. } => self.wait_for_response(cx),
776                AsyncListV2Step::WaitForRegionProvider { .. } => self.wait_for_region(cx),
777                AsyncListV2Step::WaitForEntries { .. } => self.wait_for_entries(cx),
778                AsyncListV2Step::Done => Poll::Ready(None),
779            }
780        }
781    }
782
783    impl StreamWithMarker for ListV2Stream<'_> {
784        fn marker(&self) -> Option<&str> {
785            self.params.marker.as_ref()
786        }
787    }
788
789    impl ListV2Stream<'_> {
790        fn start(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
791            if let AsyncListV2Step::Start { .. } = &mut self.current_step {
792                if self.params.have_done() {
793                    self.current_step = AsyncListV2Step::Done;
794                } else {
795                    let bucket = self.params.bucket;
796                    self.current_step = AsyncListV2Step::WaitForRegionProvider {
797                        task: Box::pin(async move { bucket.async_region_provider().await }),
798                    };
799                }
800                self.poll_next(cx)
801            } else {
802                unreachable!()
803            }
804        }
805
806        fn wait_for_region(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
807            if let AsyncListV2Step::WaitForRegionProvider { task } = &mut self.current_step {
808                match ready!(task.poll_unpin(cx)) {
809                    Ok(region_provider) => {
810                        let credential = self.params.bucket.objects_manager().credential();
811                        let mut request = self
812                            .params
813                            .bucket
814                            .objects_manager()
815                            .client()
816                            .storage()
817                            .get_objects_v2()
818                            .new_async_request(RegionsProviderEndpoints::new(region_provider), credential);
819                        request.query_pairs(self.params.to_query_params());
820                        if let Err(err) = self.callbacks.before_request(request.parts_mut()) {
821                            self.current_step = AsyncListV2Step::Done;
822                            Poll::Ready(Some(Err(make_callback_error(err))))
823                        } else {
824                            self.current_step = AsyncListV2Step::WaitForResponse {
825                                task: Box::pin(async move { request.call().await }),
826                            };
827                            self.poll_next(cx)
828                        }
829                    }
830                    Err(err) => {
831                        self.current_step = AsyncListV2Step::Done;
832                        Poll::Ready(Some(Err(err.into())))
833                    }
834                }
835            } else {
836                unreachable!()
837            }
838        }
839
840        fn wait_for_response(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
841            if let AsyncListV2Step::WaitForResponse { task } = &mut self.current_step {
842                let mut response_result = ready!(task.poll_unpin(cx));
843                if let Err(err) = self.callbacks.after_response(&mut response_result) {
844                    self.current_step = AsyncListV2Step::Done;
845                    Poll::Ready(Some(Err(make_callback_error(err))))
846                } else {
847                    match response_result {
848                        Ok(response) => {
849                            self.current_step = AsyncListV2Step::WaitForEntries {
850                                lines: AsyncBufReader::new(response.into_body()).lines(),
851                                empty: true,
852                            };
853                            self.poll_next(cx)
854                        }
855                        Err(err) => {
856                            self.current_step = AsyncListV2Step::Done;
857                            Poll::Ready(Some(Err(err)))
858                        }
859                    }
860                }
861            } else {
862                unreachable!()
863            }
864        }
865
866        fn wait_for_entries(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
867            if let AsyncListV2Step::WaitForEntries { lines, ref mut empty } = &mut self.current_step {
868                match ready!(lines.poll_next_unpin(cx)) {
869                    Some(Ok(line)) if line.is_empty() => self.wait_for_entries(cx),
870                    Some(Ok(line)) => match serde_json::from_str::<ListedObjectEntryV2>(&line) {
871                        Ok(parsed) => {
872                            *empty = false;
873                            self.params.marker.set(parsed.marker.as_deref());
874                            if let Some(item) = parsed.item {
875                                self.params.limit.saturating_decrease(1);
876                                Poll::Ready(Some(Ok(item)))
877                            } else {
878                                self.wait_for_entries(cx)
879                            }
880                        }
881                        Err(err) => {
882                            self.current_step = AsyncListV2Step::Done;
883                            Poll::Ready(Some(Err(err.into())))
884                        }
885                    },
886                    Some(Err(err)) => {
887                        self.current_step = AsyncListV2Step::Done;
888                        Poll::Ready(Some(Err(err.into())))
889                    }
890                    None if *empty => {
891                        self.current_step = AsyncListV2Step::Done;
892                        Poll::Ready(None)
893                    }
894                    None => {
895                        self.current_step = AsyncListV2Step::Start;
896                        self.poll_next(cx)
897                    }
898                }
899            } else {
900                unreachable!()
901            }
902        }
903    }
904}
905
906pub(super) fn make_callback_error(err: AnyError) -> ResponseError {
907    ResponseError::new_with_msg(HttpResponseErrorKind::CallbackError.into(), err)
908}
909
910#[cfg(feature = "async")]
911pub use async_list_stream::*;
912
913#[cfg(test)]
914mod tests {
915    use super::{super::ObjectsManager, *};
916    use anyhow::Error as AnyError;
917    use qiniu_apis::{
918        credential::Credential,
919        http::{HeaderValue, HttpCaller, StatusCode, SyncRequest, SyncResponse, SyncResponseResult},
920        http_client::{BucketName, DirectChooser, HttpClient, NeverRetrier, Region, ResponseErrorKind, NO_BACKOFF},
921    };
922    use serde_json::{json, to_string as json_to_string, to_vec as json_to_vec};
923    use std::{
924        sync::{
925            atomic::{AtomicUsize, Ordering},
926            Arc,
927        },
928        time::{SystemTime, UNIX_EPOCH},
929    };
930
931    #[cfg(feature = "async")]
932    use {
933        futures::{future::BoxFuture, StreamExt, TryStreamExt},
934        qiniu_apis::http::{AsyncRequest, AsyncResponse, AsyncResponseResult},
935    };
936
937    #[test]
938    fn test_sync_list_v1() -> anyhow::Result<()> {
939        env_logger::builder().is_test(true).try_init().ok();
940
941        #[derive(Debug, Default)]
942        struct FakeHttpCaller {
943            counter: AtomicUsize,
944        }
945
946        impl HttpCaller for FakeHttpCaller {
947            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
948                let n = self.counter.fetch_add(1, Ordering::SeqCst);
949                let body = match n {
950                    0 => {
951                        assert!(request
952                            .url()
953                            .to_string()
954                            .ends_with("/list?bucket=fakebucketname&limit=1000"));
955                        SyncResponseBody::from_bytes(
956                            json_to_vec(&json!({
957                                "marker": "fakemarker",
958                                "items": [{
959                                    "key": "fakeobj1",
960                                    "put_time": generate_put_time(),
961                                    "hash": "fakeobj1hash",
962                                    "fsize": 1usize,
963                                    "mime_type": "text/plain",
964                                }, {
965                                    "key": "fakeobj2",
966                                    "put_time": generate_put_time(),
967                                    "hash": "fakeobj2hash",
968                                    "fsize": 2usize,
969                                    "mime_type": "text/plain",
970                                }]
971                            }))
972                            .unwrap(),
973                        )
974                    }
975                    1 => {
976                        assert!(request
977                            .url()
978                            .to_string()
979                            .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
980                        SyncResponseBody::from_bytes(
981                            json_to_vec(&json!({
982                                "marker": "",
983                                "items": [{
984                                    "key": "fakeobj3",
985                                    "put_time": generate_put_time(),
986                                    "hash": "fakeobj3hash",
987                                    "fsize": 3usize,
988                                    "mime_type": "text/plain",
989                                }, {
990                                    "key": "fakeobj4",
991                                    "put_time": generate_put_time(),
992                                    "hash": "fakeobj4hash",
993                                    "fsize": 4usize,
994                                    "mime_type": "text/plain",
995                                }]
996                            }))
997                            .unwrap(),
998                        )
999                    }
1000                    _ => unreachable!(),
1001                };
1002                Ok(SyncResponse::builder()
1003                    .status_code(StatusCode::OK)
1004                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1005                    .body(body)
1006                    .build())
1007            }
1008
1009            #[cfg(feature = "async")]
1010            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1011                unreachable!()
1012            }
1013        }
1014
1015        let mut counter = 0usize;
1016        let bucket = get_bucket(FakeHttpCaller::default());
1017        let mut iter = bucket.list().version(ListVersion::V1).iter();
1018        for (i, entry) in (&mut iter).enumerate() {
1019            counter += 1;
1020            let entry = entry?;
1021            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1022            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1023            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1024        }
1025        assert_eq!(iter.marker(), Some(""));
1026        assert_eq!(counter, 4usize);
1027
1028        Ok(())
1029    }
1030
1031    #[test]
1032    fn test_sync_list_v1_with_error() -> anyhow::Result<()> {
1033        env_logger::builder().is_test(true).try_init().ok();
1034
1035        #[derive(Debug, Default)]
1036        struct FakeHttpCaller {
1037            counter: AtomicUsize,
1038        }
1039
1040        impl HttpCaller for FakeHttpCaller {
1041            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1042                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1043                let (code, body) = match n {
1044                    0 => {
1045                        assert!(request
1046                            .url()
1047                            .to_string()
1048                            .ends_with("/list?bucket=fakebucketname&limit=1000"));
1049                        (
1050                            StatusCode::OK,
1051                            SyncResponseBody::from_bytes(
1052                                json_to_vec(&json!({
1053                                    "marker": "fakemarker",
1054                                    "items": [{
1055                                        "key": "fakeobj1",
1056                                        "put_time": generate_put_time(),
1057                                        "hash": "fakeobj1hash",
1058                                        "fsize": 1usize,
1059                                        "mime_type": "text/plain",
1060                                    }, {
1061                                        "key": "fakeobj2",
1062                                        "put_time": generate_put_time(),
1063                                        "hash": "fakeobj2hash",
1064                                        "fsize": 2usize,
1065                                        "mime_type": "text/plain",
1066                                    }]
1067                                }))
1068                                .unwrap(),
1069                            ),
1070                        )
1071                    }
1072                    1 => {
1073                        assert!(request
1074                            .url()
1075                            .to_string()
1076                            .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1077                        (
1078                            StatusCode::from_u16(599).unwrap(),
1079                            SyncResponseBody::from_bytes(
1080                                json_to_vec(&json!({
1081                                    "error": "Test Error"
1082                                }))
1083                                .unwrap(),
1084                            ),
1085                        )
1086                    }
1087                    _ => unreachable!(),
1088                };
1089                Ok(SyncResponse::builder()
1090                    .status_code(code)
1091                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1092                    .body(body)
1093                    .build())
1094            }
1095
1096            #[cfg(feature = "async")]
1097            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1098                unreachable!()
1099            }
1100        }
1101
1102        let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1103        let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1104        let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1105        let bucket = get_bucket(FakeHttpCaller::default());
1106        let mut iter = bucket
1107            .list()
1108            .version(ListVersion::V1)
1109            .before_request_callback({
1110                let before_request_callback_counter = before_request_callback_counter.to_owned();
1111                move |_| {
1112                    before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1113                    Ok(())
1114                }
1115            })
1116            .after_response_ok_callback({
1117                let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1118                move |_| {
1119                    after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1120                    Ok(())
1121                }
1122            })
1123            .after_response_error_callback({
1124                let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1125                move |_| {
1126                    after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1127                    Ok(())
1128                }
1129            })
1130            .iter();
1131        let mut entry = iter.next().unwrap()?;
1132        assert_eq!(entry.get_key_as_str(), "fakeobj1");
1133        assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1134        assert_eq!(entry.get_size_as_u64(), 1u64);
1135
1136        entry = iter.next().unwrap()?;
1137        assert_eq!(entry.get_key_as_str(), "fakeobj2");
1138        assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1139        assert_eq!(entry.get_size_as_u64(), 2u64);
1140
1141        let err = iter.next().unwrap().unwrap_err();
1142        assert_eq!(
1143            err.kind(),
1144            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1145        );
1146        assert!(iter.next().is_none());
1147        assert_eq!(iter.marker(), Some("fakemarker"));
1148        assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1149        assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1150        assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1151
1152        Ok(())
1153    }
1154
1155    #[test]
1156    fn test_sync_list_v1_with_prefix_and_limitation() -> anyhow::Result<()> {
1157        env_logger::builder().is_test(true).try_init().ok();
1158
1159        #[derive(Debug, Default)]
1160        struct FakeHttpCaller {
1161            counter: AtomicUsize,
1162        }
1163
1164        impl HttpCaller for FakeHttpCaller {
1165            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1166                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1167                let body = match n {
1168                    0 => {
1169                        assert!(request
1170                            .url()
1171                            .to_string()
1172                            .ends_with("/list?bucket=fakebucketname&limit=3&prefix=fakeobj"));
1173                        SyncResponseBody::from_bytes(
1174                            json_to_vec(&json!({
1175                                "marker": "fakemarker",
1176                                "items": [{
1177                                    "key": "fakeobj1",
1178                                    "put_time": generate_put_time(),
1179                                    "hash": "fakeobj1hash",
1180                                    "fsize": 1usize,
1181                                    "mime_type": "text/plain",
1182                                }, {
1183                                    "key": "fakeobj2",
1184                                    "put_time": generate_put_time(),
1185                                    "hash": "fakeobj2hash",
1186                                    "fsize": 2usize,
1187                                    "mime_type": "text/plain",
1188                                }]
1189                            }))
1190                            .unwrap(),
1191                        )
1192                    }
1193                    1 => {
1194                        assert!(request
1195                            .url()
1196                            .to_string()
1197                            .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1&prefix=fakeobj"));
1198                        SyncResponseBody::from_bytes(
1199                            json_to_vec(&json!({
1200                                "marker": "",
1201                                "items": [{
1202                                    "key": "fakeobj3",
1203                                    "put_time": generate_put_time(),
1204                                    "hash": "fakeobj3hash",
1205                                    "fsize": 3usize,
1206                                    "mime_type": "text/plain",
1207                                }]
1208                            }))
1209                            .unwrap(),
1210                        )
1211                    }
1212                    _ => unreachable!(),
1213                };
1214                Ok(SyncResponse::builder()
1215                    .status_code(StatusCode::OK)
1216                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1217                    .body(body)
1218                    .build())
1219            }
1220
1221            #[cfg(feature = "async")]
1222            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1223                unreachable!()
1224            }
1225        }
1226
1227        let mut counter = 0usize;
1228        let bucket = get_bucket(FakeHttpCaller::default());
1229        let mut iter = bucket.list().version(ListVersion::V1).prefix("fakeobj").limit(3).iter();
1230        for (i, entry) in (&mut iter).enumerate() {
1231            counter += 1;
1232            let entry = entry?;
1233            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1234            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1235            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1236        }
1237        assert_eq!(iter.marker(), Some(""));
1238        assert_eq!(counter, 3usize);
1239
1240        Ok(())
1241    }
1242
1243    #[test]
1244    fn test_sync_list_v1_with_cancellation() -> anyhow::Result<()> {
1245        env_logger::builder().is_test(true).try_init().ok();
1246
1247        #[derive(Debug, Default)]
1248        struct FakeHttpCaller {
1249            counter: AtomicUsize,
1250        }
1251
1252        impl HttpCaller for FakeHttpCaller {
1253            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1254                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1255                let body = match n {
1256                    0 => {
1257                        assert!(request
1258                            .url()
1259                            .to_string()
1260                            .ends_with("/list?bucket=fakebucketname&limit=1000"));
1261                        SyncResponseBody::from_bytes(
1262                            json_to_vec(&json!({
1263                                "marker": "fakemarker",
1264                                "items": [{
1265                                    "key": "fakeobj1",
1266                                    "put_time": generate_put_time(),
1267                                    "hash": "fakeobj1hash",
1268                                    "fsize": 1usize,
1269                                    "mime_type": "text/plain",
1270                                }, {
1271                                    "key": "fakeobj2",
1272                                    "put_time": generate_put_time(),
1273                                    "hash": "fakeobj2hash",
1274                                    "fsize": 2usize,
1275                                    "mime_type": "text/plain",
1276                                }]
1277                            }))
1278                            .unwrap(),
1279                        )
1280                    }
1281                    _ => unreachable!(),
1282                };
1283                Ok(SyncResponse::builder()
1284                    .status_code(StatusCode::OK)
1285                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1286                    .body(body)
1287                    .build())
1288            }
1289
1290            #[cfg(feature = "async")]
1291            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1292                unreachable!()
1293            }
1294        }
1295
1296        let counter = Arc::new(AtomicUsize::new(0));
1297        let bucket = get_bucket(FakeHttpCaller::default());
1298        {
1299            let mut iter = bucket
1300                .list()
1301                .version(ListVersion::V1)
1302                .before_request_callback({
1303                    let counter = counter.to_owned();
1304                    move |_| {
1305                        if counter.load(Ordering::Relaxed) > 0 {
1306                            Err(AnyError::msg("Fake error"))
1307                        } else {
1308                            Ok(())
1309                        }
1310                    }
1311                })
1312                .iter();
1313            for (i, entry) in (&mut iter).enumerate() {
1314                if counter.fetch_add(1, Ordering::Relaxed) < 2 {
1315                    let entry = entry?;
1316                    assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1317                    assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1318                    assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1319                } else {
1320                    let err = entry.unwrap_err();
1321                    assert!(matches!(
1322                        err.kind(),
1323                        ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
1324                    ));
1325                    break;
1326                }
1327            }
1328            assert_eq!(iter.marker(), Some("fakemarker"));
1329        }
1330        assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
1331
1332        Ok(())
1333    }
1334
1335    #[test]
1336    fn test_sync_list_v2() -> anyhow::Result<()> {
1337        env_logger::builder().is_test(true).try_init().ok();
1338
1339        #[derive(Debug, Default)]
1340        struct FakeHttpCaller {
1341            counter: AtomicUsize,
1342        }
1343
1344        impl HttpCaller for FakeHttpCaller {
1345            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1346                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1347                let body = match n {
1348                    0 => {
1349                        assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1350                        SyncResponseBody::from_bytes(
1351                            [
1352                                json_to_string(&json!({
1353                                    "item": {
1354                                        "key": "fakeobj1",
1355                                        "put_time": generate_put_time(),
1356                                        "hash": "fakeobj1hash",
1357                                        "fsize": 1usize,
1358                                        "mime_type": "text/plain",
1359                                    },
1360                                    "marker": "fakemarkerobj1",
1361                                }))
1362                                .unwrap(),
1363                                json_to_string(&json!({
1364                                    "item": {
1365                                        "key": "fakeobj2",
1366                                        "put_time": generate_put_time(),
1367                                        "hash": "fakeobj2hash",
1368                                        "fsize": 2usize,
1369                                        "mime_type": "text/plain",
1370                                    },
1371                                    "marker": "fakemarkerobj2",
1372                                }))
1373                                .unwrap(),
1374                            ]
1375                            .join("\n")
1376                            .as_bytes()
1377                            .to_owned(),
1378                        )
1379                    }
1380                    1 => {
1381                        assert!(request
1382                            .url()
1383                            .to_string()
1384                            .ends_with("/list?bucket=fakebucketname&marker=fakemarkerobj2"));
1385                        SyncResponseBody::from_bytes(
1386                            [
1387                                json_to_string(&json!({
1388                                    "item": {
1389                                        "key": "fakeobj3",
1390                                        "put_time": generate_put_time(),
1391                                        "hash": "fakeobj3hash",
1392                                        "fsize": 3usize,
1393                                        "mime_type": "text/plain",
1394                                    },
1395                                    "marker": "fakemarkerobj3",
1396                                }))
1397                                .unwrap(),
1398                                json_to_string(&json!({
1399                                    "item": {
1400                                        "key": "fakeobj4",
1401                                        "put_time": generate_put_time(),
1402                                        "hash": "fakeobj4hash",
1403                                        "fsize": 4usize,
1404                                        "mime_type": "text/plain",
1405                                    },
1406                                    "marker": "",
1407                                }))
1408                                .unwrap(),
1409                            ]
1410                            .join("\n")
1411                            .as_bytes()
1412                            .to_owned(),
1413                        )
1414                    }
1415                    _ => unreachable!(),
1416                };
1417                Ok(SyncResponse::builder()
1418                    .status_code(StatusCode::OK)
1419                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1420                    .body(body)
1421                    .build())
1422            }
1423
1424            #[cfg(feature = "async")]
1425            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1426                unreachable!()
1427            }
1428        }
1429
1430        let mut counter = 0usize;
1431        let bucket = get_bucket(FakeHttpCaller::default());
1432        let mut iter = bucket.list().version(ListVersion::V2).iter();
1433        for (i, entry) in (&mut iter).enumerate() {
1434            counter += 1;
1435            let entry = entry?;
1436            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1437            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1438            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1439        }
1440        assert_eq!(counter, 4usize);
1441        assert_eq!(iter.marker(), Some(""));
1442
1443        Ok(())
1444    }
1445
1446    #[test]
1447    fn test_sync_list_v2_with_non_results() -> anyhow::Result<()> {
1448        env_logger::builder().is_test(true).try_init().ok();
1449
1450        #[derive(Debug, Default)]
1451        struct FakeHttpCaller {
1452            counter: AtomicUsize,
1453        }
1454
1455        impl HttpCaller for FakeHttpCaller {
1456            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1457                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1458                let body = match n {
1459                    0 => {
1460                        assert!(request
1461                            .url()
1462                            .to_string()
1463                            .ends_with("/v2/list?bucket=fakebucketname&prefix=non-existed"));
1464                        SyncResponseBody::from_bytes(Vec::new())
1465                    }
1466                    _ => unreachable!(),
1467                };
1468                Ok(SyncResponse::builder()
1469                    .status_code(StatusCode::OK)
1470                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1471                    .body(body)
1472                    .build())
1473            }
1474
1475            #[cfg(feature = "async")]
1476            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1477                unreachable!()
1478            }
1479        }
1480
1481        let mut counter = 0usize;
1482        let bucket = get_bucket(FakeHttpCaller::default());
1483        let mut iter = bucket.list().version(ListVersion::V2).prefix("non-existed").iter();
1484        for _entry in &mut iter {
1485            counter += 1;
1486        }
1487        assert_eq!(counter, 0usize);
1488        assert_eq!(iter.marker(), None);
1489
1490        Ok(())
1491    }
1492
1493    #[test]
1494    fn test_sync_list_v2_with_error() -> anyhow::Result<()> {
1495        env_logger::builder().is_test(true).try_init().ok();
1496
1497        #[derive(Debug, Default)]
1498        struct FakeHttpCaller {
1499            counter: AtomicUsize,
1500        }
1501
1502        impl HttpCaller for FakeHttpCaller {
1503            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1504                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1505                let (code, body) = match n {
1506                    0 => {
1507                        assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1508                        (
1509                            StatusCode::OK,
1510                            SyncResponseBody::from_bytes(
1511                                [
1512                                    json_to_string(&json!({
1513                                        "item": {
1514                                            "key": "fakeobj1",
1515                                            "put_time": generate_put_time(),
1516                                            "hash": "fakeobj1hash",
1517                                            "fsize": 1usize,
1518                                            "mime_type": "text/plain",
1519                                        },
1520                                        "marker": "fakemarkerobj1",
1521                                    }))
1522                                    .unwrap(),
1523                                    json_to_string(&json!({
1524                                        "item": {
1525                                            "key": "fakeobj2",
1526                                            "put_time": generate_put_time(),
1527                                            "hash": "fakeobj2hash",
1528                                            "fsize": 2usize,
1529                                            "mime_type": "text/plain",
1530                                        },
1531                                        "marker": "fakemarkerobj2",
1532                                    }))
1533                                    .unwrap(),
1534                                ]
1535                                .join("\n")
1536                                .as_bytes()
1537                                .to_owned(),
1538                            ),
1539                        )
1540                    }
1541                    1 => {
1542                        assert!(request
1543                            .url()
1544                            .to_string()
1545                            .ends_with("/v2/list?bucket=fakebucketname&marker=fakemarkerobj2"));
1546                        (
1547                            StatusCode::from_u16(599).unwrap(),
1548                            SyncResponseBody::from_bytes(
1549                                json_to_vec(&json!({
1550                                    "error": "Test Error"
1551                                }))
1552                                .unwrap(),
1553                            ),
1554                        )
1555                    }
1556                    _ => unreachable!(),
1557                };
1558                Ok(SyncResponse::builder()
1559                    .status_code(code)
1560                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1561                    .body(body)
1562                    .build())
1563            }
1564
1565            #[cfg(feature = "async")]
1566            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1567                unreachable!()
1568            }
1569        }
1570
1571        let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1572        let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1573        let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1574        let bucket = get_bucket(FakeHttpCaller::default());
1575        let mut iter = bucket
1576            .list()
1577            .version(ListVersion::V2)
1578            .before_request_callback({
1579                let before_request_callback_counter = before_request_callback_counter.to_owned();
1580                move |_| {
1581                    before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1582                    Ok(())
1583                }
1584            })
1585            .after_response_ok_callback({
1586                let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1587                move |_| {
1588                    after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1589                    Ok(())
1590                }
1591            })
1592            .after_response_error_callback({
1593                let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1594                move |_| {
1595                    after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1596                    Ok(())
1597                }
1598            })
1599            .iter();
1600        let mut entry = iter.next().unwrap()?;
1601        assert_eq!(entry.get_key_as_str(), "fakeobj1");
1602        assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1603        assert_eq!(entry.get_size_as_u64(), 1u64);
1604
1605        entry = iter.next().unwrap()?;
1606        assert_eq!(entry.get_key_as_str(), "fakeobj2");
1607        assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1608        assert_eq!(entry.get_size_as_u64(), 2u64);
1609
1610        let err = iter.next().unwrap().unwrap_err();
1611        assert_eq!(
1612            err.kind(),
1613            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1614        );
1615        assert!(iter.next().is_none());
1616        assert_eq!(iter.marker(), Some("fakemarkerobj2"));
1617        assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1618        assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1619        assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1620
1621        Ok(())
1622    }
1623
1624    #[test]
1625    fn test_sync_list_v2_with_cancellation() -> anyhow::Result<()> {
1626        env_logger::builder().is_test(true).try_init().ok();
1627
1628        #[derive(Debug, Default)]
1629        struct FakeHttpCaller {
1630            counter: AtomicUsize,
1631        }
1632
1633        impl HttpCaller for FakeHttpCaller {
1634            fn call(&self, request: &mut SyncRequest<'_>) -> SyncResponseResult {
1635                let n = self.counter.fetch_add(1, Ordering::SeqCst);
1636                let body = match n {
1637                    0 => {
1638                        assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
1639                        SyncResponseBody::from_bytes(
1640                            [
1641                                json_to_string(&json!({
1642                                    "item": {
1643                                        "key": "fakeobj1",
1644                                        "put_time": generate_put_time(),
1645                                        "hash": "fakeobj1hash",
1646                                        "fsize": 1usize,
1647                                        "mime_type": "text/plain",
1648                                    },
1649                                    "marker": "fakemarkerobj1",
1650                                }))
1651                                .unwrap(),
1652                                json_to_string(&json!({
1653                                    "item": {
1654                                        "key": "fakeobj2",
1655                                        "put_time": generate_put_time(),
1656                                        "hash": "fakeobj2hash",
1657                                        "fsize": 2usize,
1658                                        "mime_type": "text/plain",
1659                                    },
1660                                    "marker": "fakemarkerobj2",
1661                                }))
1662                                .unwrap(),
1663                            ]
1664                            .join("\n")
1665                            .as_bytes()
1666                            .to_owned(),
1667                        )
1668                    }
1669                    _ => unreachable!(),
1670                };
1671                Ok(SyncResponse::builder()
1672                    .status_code(StatusCode::OK)
1673                    .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1674                    .body(body)
1675                    .build())
1676            }
1677
1678            #[cfg(feature = "async")]
1679            fn async_call(&self, _request: &mut AsyncRequest<'_>) -> BoxFuture<AsyncResponseResult> {
1680                unreachable!()
1681            }
1682        }
1683
1684        let counter = Arc::new(AtomicUsize::new(0));
1685        let bucket = get_bucket(FakeHttpCaller::default());
1686        {
1687            let mut iter = bucket
1688                .list()
1689                .version(ListVersion::V2)
1690                .before_request_callback({
1691                    let counter = counter.to_owned();
1692                    move |_| {
1693                        if counter.load(Ordering::Relaxed) > 0 {
1694                            Err(AnyError::msg("Fake error"))
1695                        } else {
1696                            Ok(())
1697                        }
1698                    }
1699                })
1700                .iter();
1701            for (i, entry) in (&mut iter).enumerate() {
1702                if counter.fetch_add(1, Ordering::Relaxed) < 2 {
1703                    let entry = entry?;
1704                    assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1705                    assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1706                    assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1707                } else {
1708                    let err = entry.unwrap_err();
1709                    assert!(matches!(
1710                        err.kind(),
1711                        ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
1712                    ));
1713                    break;
1714                }
1715            }
1716            assert_eq!(iter.marker(), Some("fakemarkerobj2"));
1717        }
1718        assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
1719
1720        Ok(())
1721    }
1722
1723    #[async_std::test]
1724    #[cfg(feature = "async")]
1725    async fn test_async_list_v1() -> anyhow::Result<()> {
1726        env_logger::builder().is_test(true).try_init().ok();
1727
1728        #[derive(Debug, Default)]
1729        struct FakeHttpCaller {
1730            counter: AtomicUsize,
1731        }
1732
1733        impl HttpCaller for FakeHttpCaller {
1734            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1735                unreachable!()
1736            }
1737
1738            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1739                Box::pin(async move {
1740                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
1741                    let body = match n {
1742                        0 => {
1743                            assert!(request
1744                                .url()
1745                                .to_string()
1746                                .ends_with("/list?bucket=fakebucketname&limit=1000"));
1747                            AsyncResponseBody::from_bytes(
1748                                json_to_vec(&json!({
1749                                    "marker": "fakemarker",
1750                                    "items": [{
1751                                        "key": "fakeobj1",
1752                                        "put_time": generate_put_time(),
1753                                        "hash": "fakeobj1hash",
1754                                        "fsize": 1usize,
1755                                        "mime_type": "text/plain",
1756                                    }, {
1757                                        "key": "fakeobj2",
1758                                        "put_time": generate_put_time(),
1759                                        "hash": "fakeobj2hash",
1760                                        "fsize": 2usize,
1761                                        "mime_type": "text/plain",
1762                                    }]
1763                                }))
1764                                .unwrap(),
1765                            )
1766                        }
1767                        1 => {
1768                            assert!(request
1769                                .url()
1770                                .to_string()
1771                                .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1772                            AsyncResponseBody::from_bytes(
1773                                json_to_vec(&json!({
1774                                    "marker": "",
1775                                    "items": [{
1776                                        "key": "fakeobj3",
1777                                        "put_time": generate_put_time(),
1778                                        "hash": "fakeobj3hash",
1779                                        "fsize": 3usize,
1780                                        "mime_type": "text/plain",
1781                                    }, {
1782                                        "key": "fakeobj4",
1783                                        "put_time": generate_put_time(),
1784                                        "hash": "fakeobj4hash",
1785                                        "fsize": 4usize,
1786                                        "mime_type": "text/plain",
1787                                    }]
1788                                }))
1789                                .unwrap(),
1790                            )
1791                        }
1792                        _ => unreachable!(),
1793                    };
1794                    Ok(AsyncResponse::builder()
1795                        .status_code(StatusCode::OK)
1796                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1797                        .body(body)
1798                        .build())
1799                })
1800            }
1801        }
1802
1803        let mut counter = 0usize;
1804        let bucket = get_bucket(FakeHttpCaller::default());
1805        let mut stream = bucket.list().version(ListVersion::V1).stream();
1806        let mut iter = (&mut stream).enumerate();
1807        while let Some((i, entry)) = iter.next().await {
1808            counter += 1;
1809            let entry = entry?;
1810            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
1811            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
1812            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
1813        }
1814        assert_eq!(stream.marker(), Some(""));
1815        assert_eq!(counter, 4usize);
1816
1817        Ok(())
1818    }
1819
1820    #[async_std::test]
1821    #[cfg(feature = "async")]
1822    async fn test_async_list_v1_with_error() -> anyhow::Result<()> {
1823        env_logger::builder().is_test(true).try_init().ok();
1824
1825        #[derive(Debug, Default)]
1826        struct FakeHttpCaller {
1827            counter: AtomicUsize,
1828        }
1829
1830        impl HttpCaller for FakeHttpCaller {
1831            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1832                unreachable!()
1833            }
1834
1835            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1836                Box::pin(async move {
1837                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
1838                    let (code, body) = match n {
1839                        0 => {
1840                            assert!(request
1841                                .url()
1842                                .to_string()
1843                                .ends_with("/list?bucket=fakebucketname&limit=1000"));
1844                            (
1845                                StatusCode::OK,
1846                                AsyncResponseBody::from_bytes(
1847                                    json_to_vec(&json!({
1848                                        "marker": "fakemarker",
1849                                        "items": [{
1850                                            "key": "fakeobj1",
1851                                            "put_time": generate_put_time(),
1852                                            "hash": "fakeobj1hash",
1853                                            "fsize": 1usize,
1854                                            "mime_type": "text/plain",
1855                                        }, {
1856                                            "key": "fakeobj2",
1857                                            "put_time": generate_put_time(),
1858                                            "hash": "fakeobj2hash",
1859                                            "fsize": 2usize,
1860                                            "mime_type": "text/plain",
1861                                        }]
1862                                    }))
1863                                    .unwrap(),
1864                                ),
1865                            )
1866                        }
1867                        1 => {
1868                            assert!(request
1869                                .url()
1870                                .to_string()
1871                                .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1000"));
1872                            (
1873                                StatusCode::from_u16(599).unwrap(),
1874                                AsyncResponseBody::from_bytes(
1875                                    json_to_vec(&json!({
1876                                        "error": "Test Error"
1877                                    }))
1878                                    .unwrap(),
1879                                ),
1880                            )
1881                        }
1882                        _ => unreachable!(),
1883                    };
1884                    Ok(AsyncResponse::builder()
1885                        .status_code(code)
1886                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1887                        .body(body)
1888                        .build())
1889                })
1890            }
1891        }
1892
1893        let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
1894        let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
1895        let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
1896        let bucket = get_bucket(FakeHttpCaller::default());
1897        let mut iter = bucket
1898            .list()
1899            .version(ListVersion::V1)
1900            .before_request_callback({
1901                let before_request_callback_counter = before_request_callback_counter.to_owned();
1902                move |_| {
1903                    before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
1904                    Ok(())
1905                }
1906            })
1907            .after_response_ok_callback({
1908                let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
1909                move |_| {
1910                    after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
1911                    Ok(())
1912                }
1913            })
1914            .after_response_error_callback({
1915                let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
1916                move |_| {
1917                    after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
1918                    Ok(())
1919                }
1920            })
1921            .stream();
1922        let mut entry = iter.try_next().await?.unwrap();
1923        assert_eq!(entry.get_key_as_str(), "fakeobj1");
1924        assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
1925        assert_eq!(entry.get_size_as_u64(), 1u64);
1926
1927        entry = iter.try_next().await?.unwrap();
1928        assert_eq!(entry.get_key_as_str(), "fakeobj2");
1929        assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
1930        assert_eq!(entry.get_size_as_u64(), 2u64);
1931
1932        let err = iter.try_next().await.unwrap_err();
1933        assert_eq!(
1934            err.kind(),
1935            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
1936        );
1937        assert!(iter.try_next().await?.is_none());
1938        assert_eq!(iter.marker(), Some("fakemarker"));
1939        assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
1940        assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
1941        assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
1942
1943        Ok(())
1944    }
1945
1946    #[async_std::test]
1947    #[cfg(feature = "async")]
1948    async fn test_async_list_v1_with_prefix_and_limitation() -> anyhow::Result<()> {
1949        env_logger::builder().is_test(true).try_init().ok();
1950
1951        #[derive(Debug, Default)]
1952        struct FakeHttpCaller {
1953            counter: AtomicUsize,
1954        }
1955
1956        impl HttpCaller for FakeHttpCaller {
1957            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1958                unreachable!()
1959            }
1960
1961            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1962                Box::pin(async move {
1963                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
1964                    let body = match n {
1965                        0 => {
1966                            assert!(request
1967                                .url()
1968                                .to_string()
1969                                .ends_with("/list?bucket=fakebucketname&limit=3&prefix=fakeobj"));
1970                            AsyncResponseBody::from_bytes(
1971                                json_to_vec(&json!({
1972                                    "marker": "fakemarker",
1973                                    "items": [{
1974                                        "key": "fakeobj1",
1975                                        "put_time": generate_put_time(),
1976                                        "hash": "fakeobj1hash",
1977                                        "fsize": 1usize,
1978                                        "mime_type": "text/plain",
1979                                    }, {
1980                                        "key": "fakeobj2",
1981                                        "put_time": generate_put_time(),
1982                                        "hash": "fakeobj2hash",
1983                                        "fsize": 2usize,
1984                                        "mime_type": "text/plain",
1985                                    }]
1986                                }))
1987                                .unwrap(),
1988                            )
1989                        }
1990                        1 => {
1991                            assert!(request
1992                                .url()
1993                                .to_string()
1994                                .ends_with("/list?bucket=fakebucketname&marker=fakemarker&limit=1&prefix=fakeobj"));
1995                            AsyncResponseBody::from_bytes(
1996                                json_to_vec(&json!({
1997                                    "marker": "",
1998                                    "items": [{
1999                                        "key": "fakeobj3",
2000                                        "put_time": generate_put_time(),
2001                                        "hash": "fakeobj3hash",
2002                                        "fsize": 3usize,
2003                                        "mime_type": "text/plain",
2004                                    }]
2005                                }))
2006                                .unwrap(),
2007                            )
2008                        }
2009                        _ => unreachable!(),
2010                    };
2011                    Ok(AsyncResponse::builder()
2012                        .status_code(StatusCode::OK)
2013                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2014                        .body(body)
2015                        .build())
2016                })
2017            }
2018        }
2019
2020        let mut counter = 0usize;
2021        let bucket = get_bucket(FakeHttpCaller::default());
2022        let mut stream = bucket
2023            .list()
2024            .version(ListVersion::V1)
2025            .prefix("fakeobj")
2026            .limit(3)
2027            .stream();
2028        let mut iter = (&mut stream).enumerate();
2029        while let Some((i, entry)) = iter.next().await {
2030            counter += 1;
2031            let entry = entry?;
2032            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2033            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2034            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2035        }
2036        assert_eq!(stream.marker(), Some(""));
2037        assert_eq!(counter, 3usize);
2038
2039        Ok(())
2040    }
2041
2042    #[async_std::test]
2043    #[cfg(feature = "async")]
2044    async fn test_async_list_v1_with_cancellation() -> anyhow::Result<()> {
2045        env_logger::builder().is_test(true).try_init().ok();
2046
2047        #[derive(Debug, Default)]
2048        struct FakeHttpCaller {
2049            counter: AtomicUsize,
2050        }
2051
2052        impl HttpCaller for FakeHttpCaller {
2053            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2054                unreachable!()
2055            }
2056
2057            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2058                Box::pin(async move {
2059                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
2060                    let body = match n {
2061                        0 => {
2062                            assert!(request
2063                                .url()
2064                                .to_string()
2065                                .ends_with("/list?bucket=fakebucketname&limit=1000"));
2066                            AsyncResponseBody::from_bytes(
2067                                json_to_vec(&json!({
2068                                    "marker": "fakemarker",
2069                                    "items": [{
2070                                        "key": "fakeobj1",
2071                                        "put_time": generate_put_time(),
2072                                        "hash": "fakeobj1hash",
2073                                        "fsize": 1usize,
2074                                        "mime_type": "text/plain",
2075                                    }, {
2076                                        "key": "fakeobj2",
2077                                        "put_time": generate_put_time(),
2078                                        "hash": "fakeobj2hash",
2079                                        "fsize": 2usize,
2080                                        "mime_type": "text/plain",
2081                                    }]
2082                                }))
2083                                .unwrap(),
2084                            )
2085                        }
2086                        _ => unreachable!(),
2087                    };
2088                    Ok(AsyncResponse::builder()
2089                        .status_code(StatusCode::OK)
2090                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2091                        .body(body)
2092                        .build())
2093                })
2094            }
2095        }
2096
2097        let counter = Arc::new(AtomicUsize::new(0));
2098        {
2099            let bucket = get_bucket(FakeHttpCaller::default());
2100            let mut stream = bucket
2101                .list()
2102                .version(ListVersion::V1)
2103                .before_request_callback({
2104                    let counter = counter.to_owned();
2105                    move |_| {
2106                        if counter.load(Ordering::Relaxed) > 0 {
2107                            Err(AnyError::msg("Fake error"))
2108                        } else {
2109                            Ok(())
2110                        }
2111                    }
2112                })
2113                .stream();
2114            let mut iter = (&mut stream).enumerate();
2115            while let Some((i, entry)) = iter.next().await {
2116                if counter.fetch_add(1, Ordering::Relaxed) < 2 {
2117                    let entry = entry?;
2118                    assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2119                    assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2120                    assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2121                } else {
2122                    let err = entry.unwrap_err();
2123                    assert!(matches!(
2124                        err.kind(),
2125                        ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
2126                    ));
2127                    break;
2128                }
2129            }
2130            assert_eq!(stream.marker(), Some("fakemarker"));
2131        }
2132        assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
2133
2134        Ok(())
2135    }
2136
2137    #[async_std::test]
2138    #[cfg(feature = "async")]
2139    async fn test_async_list_v2() -> anyhow::Result<()> {
2140        env_logger::builder().is_test(true).try_init().ok();
2141
2142        #[derive(Debug, Default)]
2143        struct FakeHttpCaller {
2144            counter: AtomicUsize,
2145        }
2146
2147        impl HttpCaller for FakeHttpCaller {
2148            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2149                unreachable!()
2150            }
2151
2152            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2153                Box::pin(async move {
2154                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
2155                    let body = match n {
2156                        0 => {
2157                            assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2158                            AsyncResponseBody::from_bytes(
2159                                [
2160                                    json_to_string(&json!({
2161                                        "item": {
2162                                            "key": "fakeobj1",
2163                                            "put_time": generate_put_time(),
2164                                            "hash": "fakeobj1hash",
2165                                            "fsize": 1usize,
2166                                            "mime_type": "text/plain",
2167                                        },
2168                                        "marker": "fakemarkerobj1",
2169                                    }))
2170                                    .unwrap(),
2171                                    json_to_string(&json!({
2172                                        "item": {
2173                                            "key": "fakeobj2",
2174                                            "put_time": generate_put_time(),
2175                                            "hash": "fakeobj2hash",
2176                                            "fsize": 2usize,
2177                                            "mime_type": "text/plain",
2178                                        },
2179                                        "marker": "fakemarkerobj2",
2180                                    }))
2181                                    .unwrap(),
2182                                ]
2183                                .join("\n")
2184                                .as_bytes()
2185                                .to_owned(),
2186                            )
2187                        }
2188                        1 => {
2189                            assert!(request
2190                                .url()
2191                                .to_string()
2192                                .ends_with("/list?bucket=fakebucketname&marker=fakemarkerobj2"));
2193                            AsyncResponseBody::from_bytes(
2194                                [
2195                                    json_to_string(&json!({
2196                                        "item": {
2197                                            "key": "fakeobj3",
2198                                            "put_time": generate_put_time(),
2199                                            "hash": "fakeobj3hash",
2200                                            "fsize": 3usize,
2201                                            "mime_type": "text/plain",
2202                                        },
2203                                        "marker": "fakemarkerobj3",
2204                                    }))
2205                                    .unwrap(),
2206                                    json_to_string(&json!({
2207                                        "item": {
2208                                            "key": "fakeobj4",
2209                                            "put_time": generate_put_time(),
2210                                            "hash": "fakeobj4hash",
2211                                            "fsize": 4usize,
2212                                            "mime_type": "text/plain",
2213                                        },
2214                                        "marker": "",
2215                                    }))
2216                                    .unwrap(),
2217                                ]
2218                                .join("\n")
2219                                .as_bytes()
2220                                .to_owned(),
2221                            )
2222                        }
2223                        _ => unreachable!(),
2224                    };
2225                    Ok(AsyncResponse::builder()
2226                        .status_code(StatusCode::OK)
2227                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2228                        .body(body)
2229                        .build())
2230                })
2231            }
2232        }
2233
2234        let mut counter = 0usize;
2235        let bucket = get_bucket(FakeHttpCaller::default());
2236        let mut stream = bucket.list().version(ListVersion::V2).stream();
2237        let mut iter = (&mut stream).enumerate();
2238        while let Some((i, entry)) = iter.next().await {
2239            counter += 1;
2240            let entry = entry?;
2241            assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2242            assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2243            assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2244        }
2245        assert_eq!(stream.marker(), Some(""));
2246        assert_eq!(counter, 4usize);
2247
2248        Ok(())
2249    }
2250
2251    #[async_std::test]
2252    #[cfg(feature = "async")]
2253    async fn test_async_list_v2_with_non_results() -> anyhow::Result<()> {
2254        env_logger::builder().is_test(true).try_init().ok();
2255
2256        #[derive(Debug, Default)]
2257        struct FakeHttpCaller {
2258            counter: AtomicUsize,
2259        }
2260
2261        impl HttpCaller for FakeHttpCaller {
2262            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2263                unreachable!()
2264            }
2265
2266            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2267                Box::pin(async move {
2268                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
2269                    let body = match n {
2270                        0 => {
2271                            assert!(request
2272                                .url()
2273                                .to_string()
2274                                .ends_with("/v2/list?bucket=fakebucketname&prefix=non-exist"));
2275                            AsyncResponseBody::from_bytes(Vec::new())
2276                        }
2277                        _ => unreachable!(),
2278                    };
2279                    Ok(AsyncResponse::builder()
2280                        .status_code(StatusCode::OK)
2281                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2282                        .body(body)
2283                        .build())
2284                })
2285            }
2286        }
2287
2288        let mut counter = 0usize;
2289        let bucket = get_bucket(FakeHttpCaller::default());
2290        let mut stream = bucket.list().version(ListVersion::V2).prefix("non-exist").stream();
2291        let mut iter = (&mut stream).enumerate();
2292        while let Some((_i, _entry)) = iter.next().await {
2293            counter += 1;
2294        }
2295        assert_eq!(stream.marker(), None);
2296        assert_eq!(counter, 0usize);
2297
2298        Ok(())
2299    }
2300
2301    #[async_std::test]
2302    #[cfg(feature = "async")]
2303    async fn test_async_list_v2_with_error() -> anyhow::Result<()> {
2304        env_logger::builder().is_test(true).try_init().ok();
2305
2306        #[derive(Debug, Default)]
2307        struct FakeHttpCaller {
2308            counter: AtomicUsize,
2309        }
2310
2311        impl HttpCaller for FakeHttpCaller {
2312            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2313                unreachable!()
2314            }
2315
2316            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2317                Box::pin(async move {
2318                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
2319                    let (code, body) = match n {
2320                        0 => {
2321                            assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2322                            (
2323                                StatusCode::OK,
2324                                AsyncResponseBody::from_bytes(
2325                                    [
2326                                        json_to_string(&json!({
2327                                            "item": {
2328                                                "key": "fakeobj1",
2329                                                "put_time": generate_put_time(),
2330                                                "hash": "fakeobj1hash",
2331                                                "fsize": 1usize,
2332                                                "mime_type": "text/plain",
2333                                            },
2334                                            "marker": "fakemarkerobj1",
2335                                        }))
2336                                        .unwrap(),
2337                                        json_to_string(&json!({
2338                                            "item": {
2339                                                "key": "fakeobj2",
2340                                                "put_time": generate_put_time(),
2341                                                "hash": "fakeobj2hash",
2342                                                "fsize": 2usize,
2343                                                "mime_type": "text/plain",
2344                                            },
2345                                            "marker": "fakemarkerobj2",
2346                                        }))
2347                                        .unwrap(),
2348                                    ]
2349                                    .join("\n")
2350                                    .as_bytes()
2351                                    .to_owned(),
2352                                ),
2353                            )
2354                        }
2355                        1 => {
2356                            assert!(request
2357                                .url()
2358                                .to_string()
2359                                .ends_with("/v2/list?bucket=fakebucketname&marker=fakemarkerobj2"));
2360                            (
2361                                StatusCode::from_u16(599).unwrap(),
2362                                AsyncResponseBody::from_bytes(
2363                                    json_to_vec(&json!({
2364                                        "error": "Test Error"
2365                                    }))
2366                                    .unwrap(),
2367                                ),
2368                            )
2369                        }
2370                        _ => unreachable!(),
2371                    };
2372                    Ok(AsyncResponse::builder()
2373                        .status_code(code)
2374                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2375                        .body(body)
2376                        .build())
2377                })
2378            }
2379        }
2380
2381        let before_request_callback_counter = Arc::new(AtomicUsize::new(0));
2382        let after_response_ok_callback_counter = Arc::new(AtomicUsize::new(0));
2383        let after_response_error_callback_counter = Arc::new(AtomicUsize::new(0));
2384        let bucket = get_bucket(FakeHttpCaller::default());
2385        let mut stream = bucket
2386            .list()
2387            .version(ListVersion::V2)
2388            .before_request_callback({
2389                let before_request_callback_counter = before_request_callback_counter.to_owned();
2390                move |_| {
2391                    before_request_callback_counter.fetch_add(1, Ordering::Relaxed);
2392                    Ok(())
2393                }
2394            })
2395            .after_response_ok_callback({
2396                let after_response_ok_callback_counter = after_response_ok_callback_counter.to_owned();
2397                move |_| {
2398                    after_response_ok_callback_counter.fetch_add(1, Ordering::Relaxed);
2399                    Ok(())
2400                }
2401            })
2402            .after_response_error_callback({
2403                let after_response_error_callback_counter = after_response_error_callback_counter.to_owned();
2404                move |_| {
2405                    after_response_error_callback_counter.fetch_add(1, Ordering::Relaxed);
2406                    Ok(())
2407                }
2408            })
2409            .stream();
2410        let mut entry = stream.try_next().await?.unwrap();
2411        assert_eq!(entry.get_key_as_str(), "fakeobj1");
2412        assert_eq!(entry.get_hash_as_str(), "fakeobj1hash");
2413        assert_eq!(entry.get_size_as_u64(), 1u64);
2414
2415        entry = stream.try_next().await?.unwrap();
2416        assert_eq!(entry.get_key_as_str(), "fakeobj2");
2417        assert_eq!(entry.get_hash_as_str(), "fakeobj2hash");
2418        assert_eq!(entry.get_size_as_u64(), 2u64);
2419
2420        let err = stream.try_next().await.unwrap_err();
2421        assert_eq!(
2422            err.kind(),
2423            ResponseErrorKind::StatusCodeError(StatusCode::from_u16(599)?)
2424        );
2425        assert!(stream.try_next().await?.is_none());
2426        assert_eq!(stream.marker(), Some("fakemarkerobj2"));
2427        assert_eq!(before_request_callback_counter.load(Ordering::Relaxed), 2usize);
2428        assert_eq!(after_response_ok_callback_counter.load(Ordering::Relaxed), 1usize);
2429        assert_eq!(after_response_error_callback_counter.load(Ordering::Relaxed), 1usize);
2430
2431        Ok(())
2432    }
2433
2434    #[async_std::test]
2435    #[cfg(feature = "async")]
2436    async fn test_async_list_v2_with_cancellation() -> anyhow::Result<()> {
2437        env_logger::builder().is_test(true).try_init().ok();
2438
2439        #[derive(Debug, Default)]
2440        struct FakeHttpCaller {
2441            counter: AtomicUsize,
2442        }
2443
2444        impl HttpCaller for FakeHttpCaller {
2445            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
2446                unreachable!()
2447            }
2448
2449            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
2450                Box::pin(async move {
2451                    let n = self.counter.fetch_add(1, Ordering::SeqCst);
2452                    let body = match n {
2453                        0 => {
2454                            assert!(request.url().to_string().ends_with("/v2/list?bucket=fakebucketname"));
2455                            AsyncResponseBody::from_bytes(
2456                                [
2457                                    json_to_string(&json!({
2458                                        "item": {
2459                                            "key": "fakeobj1",
2460                                            "put_time": generate_put_time(),
2461                                            "hash": "fakeobj1hash",
2462                                            "fsize": 1usize,
2463                                            "mime_type": "text/plain",
2464                                        },
2465                                        "marker": "fakemarkerobj1",
2466                                    }))
2467                                    .unwrap(),
2468                                    json_to_string(&json!({
2469                                        "item": {
2470                                            "key": "fakeobj2",
2471                                            "put_time": generate_put_time(),
2472                                            "hash": "fakeobj2hash",
2473                                            "fsize": 2usize,
2474                                            "mime_type": "text/plain",
2475                                        },
2476                                        "marker": "fakemarkerobj2",
2477                                    }))
2478                                    .unwrap(),
2479                                ]
2480                                .join("\n")
2481                                .as_bytes()
2482                                .to_owned(),
2483                            )
2484                        }
2485                        _ => unreachable!(),
2486                    };
2487                    Ok(AsyncResponse::builder()
2488                        .status_code(StatusCode::OK)
2489                        .header("x-reqid", HeaderValue::from_static("FakeReqid"))
2490                        .body(body)
2491                        .build())
2492                })
2493            }
2494        }
2495
2496        let counter = Arc::new(AtomicUsize::new(0));
2497        {
2498            let bucket = get_bucket(FakeHttpCaller::default());
2499            let mut stream = bucket
2500                .list()
2501                .version(ListVersion::V2)
2502                .before_request_callback({
2503                    let counter = counter.to_owned();
2504                    move |_| {
2505                        if counter.load(Ordering::Relaxed) > 0 {
2506                            Err(AnyError::msg("Fake error"))
2507                        } else {
2508                            Ok(())
2509                        }
2510                    }
2511                })
2512                .stream();
2513            let mut iter = (&mut stream).enumerate();
2514            while let Some((i, entry)) = iter.next().await {
2515                if counter.fetch_add(1, Ordering::Relaxed) < 2 {
2516                    let entry = entry?;
2517                    assert_eq!(entry.get_key_as_str(), &format!("fakeobj{}", i + 1));
2518                    assert_eq!(entry.get_hash_as_str(), &format!("fakeobj{}hash", i + 1));
2519                    assert_eq!(entry.get_size_as_u64(), i as u64 + 1);
2520                } else {
2521                    let err = entry.unwrap_err();
2522                    assert!(matches!(
2523                        err.kind(),
2524                        ResponseErrorKind::HttpError(HttpResponseErrorKind::CallbackError { .. })
2525                    ));
2526                    break;
2527                }
2528            }
2529            assert_eq!(stream.marker(), Some("fakemarkerobj2"));
2530        }
2531        assert_eq!(Arc::try_unwrap(counter).unwrap().into_inner(), 3usize);
2532
2533        Ok(())
2534    }
2535
2536    fn get_bucket(caller: impl HttpCaller + 'static) -> Bucket {
2537        let object_manager = ObjectsManager::builder(get_credential())
2538            .http_client(
2539                HttpClient::builder(caller)
2540                    .chooser(DirectChooser)
2541                    .request_retrier(NeverRetrier)
2542                    .backoff(NO_BACKOFF)
2543                    .build(),
2544            )
2545            .build();
2546        object_manager.bucket_with_region(get_bucket_name(), single_rsf_domain_region())
2547    }
2548
2549    fn get_credential() -> Credential {
2550        Credential::new("fakeaccesskey", "fakesecretkey")
2551    }
2552
2553    fn get_bucket_name() -> BucketName {
2554        "fakebucketname".into()
2555    }
2556
2557    fn generate_put_time() -> u64 {
2558        SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64 / 100
2559    }
2560
2561    fn single_rsf_domain_region() -> Region {
2562        Region::builder("chaotic")
2563            .add_rsf_preferred_endpoint(("fakersf.example.com".to_owned(), 8080).into())
2564            .build()
2565    }
2566}