stripe_client_core/
pagination.rs

1use miniserde::Deserialize;
2use serde::Serialize;
3use serde_json::Value;
4use stripe_types::{AsCursorOpt, List, Object, SearchList};
5
6use crate::{RequestBuilder, StripeBlockingClient, StripeClient, StripeMethod};
7
8/// A trait allowing `List<T>` and `SearchList<T>` to be treated the same. Not part of the
9/// public API.
10///
11/// NB: this trait is designed specifically for `List` and `SearchList` and may not be sensible
12/// in other cases. One gotcha is that `into_parts` and `from_parts` do not necessarily
13/// round-trip, e.g. `SearchList<T>` will lose the `next_page` field since that
14/// is not part of the shared list impl. We account for this by ensuring to call `update_params`
15/// before breaking the `SearchList` into pieces.
16#[doc(hidden)]
17pub trait PaginableList: Deserialize {
18    /// Underlying single element type, e.g. `Account`
19    type Data;
20
21    /// Break into the shared parts list pagination requires
22    fn into_parts(self) -> ListParts<Self::Data>;
23
24    /// Reconstruct from the shared parts list pagination requires
25    fn from_parts(parts: ListParts<Self::Data>) -> Self;
26
27    /// Update the current parameter set, with `self` as the most
28    /// recently fetched page.
29    ///
30    /// NB: this should also set `has_more` to `false` explicitly if we don't have a new cursor.
31    /// (This seems redundant with `has_more` but is used to provide extra protection
32    /// against any possibility where `has_more` is `true`, but the cursor is back to `None`,
33    /// potentially leading to an infinite pagination loop).
34    fn update_params(&mut self, params: &mut Value);
35}
36
37/// Specific list parts relied on by the client for pagination.
38#[doc(hidden)]
39#[derive(Debug)]
40pub struct ListParts<T> {
41    pub total_count: Option<u64>,
42    pub url: String,
43    pub data: Vec<T>,
44    pub has_more: bool,
45}
46
47impl<T> PaginableList for List<T>
48where
49    T: Object,
50    List<T>: Deserialize,
51{
52    type Data = T;
53
54    fn into_parts(self) -> ListParts<Self::Data> {
55        ListParts {
56            total_count: self.total_count,
57            url: self.url,
58            data: self.data,
59            has_more: self.has_more,
60        }
61    }
62
63    fn from_parts(parts: ListParts<Self::Data>) -> Self {
64        Self {
65            data: parts.data,
66            has_more: parts.has_more,
67            total_count: parts.total_count,
68            url: parts.url,
69        }
70    }
71
72    fn update_params(&mut self, params: &mut Value) {
73        if let Some(new_cursor) = self.data.last().and_then(|l| l.id().as_cursor_opt()) {
74            params["starting_after"] = Value::String(new_cursor.into());
75        } else {
76            self.has_more = false;
77        }
78    }
79}
80
81impl<T> PaginableList for SearchList<T>
82where
83    SearchList<T>: Deserialize,
84{
85    type Data = T;
86
87    /// NB: here we lose `next_page`, so we should be sure to `update_params`
88    /// before calling this.
89    fn into_parts(self) -> ListParts<Self::Data> {
90        ListParts {
91            total_count: self.total_count,
92            url: self.url,
93            data: self.data,
94            has_more: self.has_more,
95        }
96    }
97
98    fn from_parts(parts: ListParts<Self::Data>) -> Self {
99        Self {
100            url: parts.url,
101            has_more: parts.has_more,
102            data: parts.data,
103            next_page: None,
104            total_count: parts.total_count,
105        }
106    }
107
108    fn update_params(&mut self, params: &mut Value) {
109        if let Some(next_page) = self.next_page.take() {
110            params["page"] = Value::String(next_page);
111        } else {
112            self.has_more = false;
113        }
114    }
115}
116
117/// An extension trait to allow converting `List<T>` and `SearchList<T>` into
118/// a type that can be paginated. Not meant to be implemented by any other types.
119pub trait PaginationExt {
120    /// The underlying pagination type, e.g. `List<T>` or `SearchList<T>`.
121    type Data;
122
123    /// Use the current page state to construct an adaptor capable of paginating
124    /// from where the current data left off.
125    fn into_paginator(self) -> ListPaginator<Self::Data>;
126}
127
128impl<T> PaginationExt for List<T>
129where
130    T: Sync + Send + 'static,
131    List<T>: PaginableList,
132{
133    type Data = List<T>;
134
135    fn into_paginator(mut self) -> ListPaginator<List<T>> {
136        let mut params = Default::default();
137        self.update_params(&mut params);
138        ListPaginator { page: self, params }
139    }
140}
141
142impl<T> PaginationExt for SearchList<T>
143where
144    T: Sync + Send + 'static,
145    SearchList<T>: PaginableList,
146{
147    type Data = SearchList<T>;
148
149    fn into_paginator(mut self) -> ListPaginator<SearchList<T>> {
150        let mut params = Default::default();
151        self.update_params(&mut params);
152        ListPaginator { page: self, params }
153    }
154}
155
156/// Stream designed to support pagination.
157#[derive(Debug)]
158pub struct ListPaginator<T> {
159    page: T,
160    params: Value,
161}
162
163impl<T> ListPaginator<SearchList<T>> {
164    /// Kept public so that the generated code crates can access this trait. Used by `Search*` params
165    /// to implement `PaginationExt`. Not part of the public API.
166    #[doc(hidden)]
167    pub fn new_search_list(url: impl Into<String>, params: impl Serialize) -> Self {
168        let page = SearchList {
169            url: url.into(),
170            has_more: true,
171            data: vec![],
172            next_page: None,
173            total_count: None,
174        };
175        Self {
176            page,
177            params: serde_json::to_value(params)
178                // Expect should be safe since we control which types call this
179                .expect("all Stripe types implement `Serialize` infallibly"),
180        }
181    }
182}
183
184impl<T> ListPaginator<List<T>> {
185    /// Kept public so that the generated code crates can access this trait. Used by `List*` params
186    /// to implement `PaginationExt`. Not part of the public API.
187    #[doc(hidden)]
188    pub fn new_list(url: impl Into<String>, params: impl Serialize) -> Self {
189        let page = List { data: vec![], has_more: true, total_count: None, url: url.into() };
190        Self {
191            page,
192            params: serde_json::to_value(params)
193                .expect("all Stripe types implement `Serialize` infallibly"),
194        }
195    }
196}
197
198fn req_builder(url: &str) -> RequestBuilder {
199    RequestBuilder::new(StripeMethod::Get, url.trim_start_matches("/v1"))
200}
201
202impl<T> ListPaginator<T>
203where
204    T: Sync + Send + 'static + PaginableList,
205{
206    /// Repeatedly queries Stripe for more data until all elements in list are fetched, using
207    /// Stripe's default page size.
208    ///
209    /// # Errors
210    /// If any pagination request returns an error.
211    pub fn get_all<C: StripeBlockingClient>(self, client: &C) -> Result<Vec<T::Data>, C::Err> {
212        let mut data = vec![];
213        let mut parts = self.page.into_parts();
214        let mut params = self.params;
215        loop {
216            // `append` empties `parts.data`
217            data.append(&mut parts.data);
218
219            if !parts.has_more {
220                break;
221            }
222
223            let req = req_builder(&parts.url).query(&params);
224            let mut next_page: T = req.customize().send_blocking(client)?;
225            next_page.update_params(&mut params);
226            parts = next_page.into_parts();
227        }
228        Ok(data)
229    }
230
231    /// Get all values in this List, consuming self and lazily paginating until all values are fetched.
232    ///
233    /// This function repeatedly queries Stripe for more data until all elements in list are fetched, using
234    /// the page size specified in params, or Stripe's default page size if none is specified.
235    pub fn stream<C: StripeClient + Clone>(
236        self,
237        client: &C,
238    ) -> impl futures_util::Stream<Item = Result<T::Data, C::Err>> + Unpin {
239        // We are going to be popping items off the end of the list, so we need to reverse it.
240        let mut page = self.page.into_parts();
241        page.data.reverse();
242        let paginator = ListPaginator { page: T::from_parts(page), params: self.params };
243
244        Box::pin(futures_util::stream::unfold(
245            Some((paginator, client.clone())),
246            Self::unfold_stream,
247        ))
248    }
249
250    /// Unfold a single item from the stream.
251    async fn unfold_stream<C: StripeClient + Clone>(
252        state: Option<(Self, C)>,
253    ) -> Option<(Result<T::Data, C::Err>, Option<(Self, C)>)> {
254        let (paginator, client) = state?; // If none, our last request was an error, so stop here
255        let mut parts = paginator.page.into_parts();
256        if let Some(next_val) = parts.data.pop() {
257            // We have more data on this page
258            return Some((
259                Ok(next_val),
260                Some((Self { page: T::from_parts(parts), params: paginator.params }, client)),
261            ));
262        }
263
264        // Final value of the stream, no errors
265        if !parts.has_more {
266            return None;
267        }
268
269        let req = req_builder(&parts.url).query(&paginator.params);
270        match req.customize::<T>().send(&client).await {
271            Ok(mut next_page) => {
272                let mut params = paginator.params;
273                next_page.update_params(&mut params);
274
275                let mut parts = next_page.into_parts();
276
277                // We are going to be popping items off the end of the list, so we need to reverse it.
278                parts.data.reverse();
279
280                let next_val = parts.data.pop()?;
281
282                // Yield last value of this page, the next page (and client) becomes the state
283                Some((Ok(next_val), Some((Self { page: T::from_parts(parts), params }, client))))
284            }
285            Err(err) => Some((Err(err), None)), // We ran into an error. The last value of the stream will be the error.
286        }
287    }
288}