stripe_client_core/
pagination.rs

1use miniserde::Deserialize;
2use serde::Serialize;
3use serde_json::Value;
4use stripe_types::{AsCursorOpt, List, Object, SearchList};
5
6use crate::{RequestBuilder, StripeBlockingClient, StripeClient, StripeMethod};
7
8/// A trait allowing `List<T>` and `SearchList<T>` to be treated the same. Not part of the
9/// public API.
10///
11/// NB: this trait is designed specifically for `List` and `SearchList` and may not be sensible
12/// in other cases. One gotcha is that `into_parts` and `from_parts` do not necessarily
13/// round-trip, e.g. `SearchList<T>` will lose the `next_page` field since that
14/// is not part of the shared list impl. We account for this by ensuring to call `update_params`
15/// before breaking the `SearchList` into pieces.
16#[doc(hidden)]
17pub trait PaginableList: Deserialize {
18    /// Underlying single element type, e.g. `Account`
19    type Data;
20
21    /// Break into the shared parts list pagination requires
22    fn into_parts(self) -> ListParts<Self::Data>;
23
24    /// Reconstruct from the shared parts list pagination requires
25    fn from_parts(parts: ListParts<Self::Data>) -> Self;
26
27    /// Update the current parameter set, with `self` as the most
28    /// recently fetched page.
29    ///
30    /// NB: this should also set `has_more` to `false` explicitly if we don't have a new cursor.
31    /// (This seems redundant with `has_more` but is used to provide extra protection
32    /// against any possibility where `has_more` is `true`, but the cursor is back to `None`,
33    /// potentially leading to an infinite pagination loop).
34    fn update_params(&mut self, params: &mut Value);
35}
36
37/// Specific list parts relied on by the client for pagination.
38#[doc(hidden)]
39#[derive(Debug)]
40pub struct ListParts<T> {
41    pub total_count: Option<u64>,
42    pub url: String,
43    pub data: Vec<T>,
44    pub has_more: bool,
45}
46
47impl<T> PaginableList for List<T>
48where
49    T: Object,
50    List<T>: Deserialize,
51{
52    type Data = T;
53
54    fn into_parts(self) -> ListParts<Self::Data> {
55        ListParts {
56            // total_count is not present in `List`, but still present in `ListParts` because the stripe API
57            // claims search pagination can still include `total_count` if requested
58            total_count: None,
59            url: self.url,
60            data: self.data,
61            has_more: self.has_more,
62        }
63    }
64
65    fn from_parts(parts: ListParts<Self::Data>) -> Self {
66        Self { data: parts.data, has_more: parts.has_more, url: parts.url }
67    }
68
69    fn update_params(&mut self, params: &mut Value) {
70        if let Some(new_cursor) = self.data.last().and_then(|l| l.id().as_cursor_opt()) {
71            params["starting_after"] = Value::String(new_cursor.into());
72        } else {
73            self.has_more = false;
74        }
75    }
76}
77
78impl<T> PaginableList for SearchList<T>
79where
80    SearchList<T>: Deserialize,
81{
82    type Data = T;
83
84    /// NB: here we lose `next_page`, so we should be sure to `update_params`
85    /// before calling this.
86    fn into_parts(self) -> ListParts<Self::Data> {
87        ListParts {
88            total_count: self.total_count,
89            url: self.url,
90            data: self.data,
91            has_more: self.has_more,
92        }
93    }
94
95    fn from_parts(parts: ListParts<Self::Data>) -> Self {
96        Self {
97            url: parts.url,
98            has_more: parts.has_more,
99            data: parts.data,
100            next_page: None,
101            total_count: parts.total_count,
102        }
103    }
104
105    fn update_params(&mut self, params: &mut Value) {
106        if let Some(next_page) = self.next_page.take() {
107            params["page"] = Value::String(next_page);
108        } else {
109            self.has_more = false;
110        }
111    }
112}
113
114/// An extension trait to allow converting `List<T>` and `SearchList<T>` into
115/// a type that can be paginated. Not meant to be implemented by any other types.
116pub trait PaginationExt {
117    /// The underlying pagination type, e.g. `List<T>` or `SearchList<T>`.
118    type Data;
119
120    /// Use the current page state to construct an adaptor capable of paginating
121    /// from where the current data left off.
122    fn into_paginator(self) -> ListPaginator<Self::Data>;
123}
124
125impl<T> PaginationExt for List<T>
126where
127    T: Sync + Send + 'static,
128    List<T>: PaginableList,
129{
130    type Data = List<T>;
131
132    fn into_paginator(mut self) -> ListPaginator<List<T>> {
133        let mut params = Default::default();
134        self.update_params(&mut params);
135        ListPaginator { page: self, params }
136    }
137}
138
139impl<T> PaginationExt for SearchList<T>
140where
141    T: Sync + Send + 'static,
142    SearchList<T>: PaginableList,
143{
144    type Data = SearchList<T>;
145
146    fn into_paginator(mut self) -> ListPaginator<SearchList<T>> {
147        let mut params = Default::default();
148        self.update_params(&mut params);
149        ListPaginator { page: self, params }
150    }
151}
152
153/// Stream designed to support pagination.
154#[derive(Debug)]
155pub struct ListPaginator<T> {
156    page: T,
157    params: Value,
158}
159
160impl<T> ListPaginator<SearchList<T>> {
161    /// Kept public so that the generated code crates can access this trait. Used by `Search*` params
162    /// to implement `PaginationExt`. Not part of the public API.
163    #[doc(hidden)]
164    pub fn new_search_list(url: impl Into<String>, params: impl Serialize) -> Self {
165        let page = SearchList {
166            url: url.into(),
167            has_more: true,
168            data: vec![],
169            next_page: None,
170            total_count: None,
171        };
172        Self {
173            page,
174            params: serde_json::to_value(params)
175                // Expect should be safe since we control which types call this
176                .expect("all Stripe types implement `Serialize` infallibly"),
177        }
178    }
179}
180
181impl<T> ListPaginator<List<T>> {
182    /// Kept public so that the generated code crates can access this trait. Used by `List*` params
183    /// to implement `PaginationExt`. Not part of the public API.
184    #[doc(hidden)]
185    pub fn new_list(url: impl Into<String>, params: impl Serialize) -> Self {
186        let page = List { data: vec![], has_more: true, url: url.into() };
187        Self {
188            page,
189            params: serde_json::to_value(params)
190                .expect("all Stripe types implement `Serialize` infallibly"),
191        }
192    }
193}
194
195fn req_builder(url: &str) -> RequestBuilder {
196    RequestBuilder::new(StripeMethod::Get, url.trim_start_matches("/v1"))
197}
198
199impl<T> ListPaginator<T>
200where
201    T: Sync + Send + 'static + PaginableList,
202{
203    /// Repeatedly queries Stripe for more data until all elements in list are fetched, using
204    /// Stripe's default page size.
205    ///
206    /// # Errors
207    /// If any pagination request returns an error.
208    pub fn get_all<C: StripeBlockingClient>(self, client: &C) -> Result<Vec<T::Data>, C::Err> {
209        let mut data = vec![];
210        let mut parts = self.page.into_parts();
211        let mut params = self.params;
212        loop {
213            // `append` empties `parts.data`
214            data.append(&mut parts.data);
215
216            if !parts.has_more {
217                break;
218            }
219
220            let req = req_builder(&parts.url).query(&params);
221            let mut next_page: T = req.customize().send_blocking(client)?;
222            next_page.update_params(&mut params);
223            parts = next_page.into_parts();
224        }
225        Ok(data)
226    }
227
228    /// Get all values in this List, consuming self and lazily paginating until all values are fetched.
229    ///
230    /// This function repeatedly queries Stripe for more data until all elements in list are fetched, using
231    /// the page size specified in params, or Stripe's default page size if none is specified.
232    pub fn stream<C: StripeClient + Clone>(
233        self,
234        client: &C,
235    ) -> impl futures_util::Stream<Item = Result<T::Data, C::Err>> + Unpin {
236        // We are going to be popping items off the end of the list, so we need to reverse it.
237        let mut page = self.page.into_parts();
238        page.data.reverse();
239        let paginator = ListPaginator { page: T::from_parts(page), params: self.params };
240
241        Box::pin(futures_util::stream::unfold(
242            Some((paginator, client.clone())),
243            Self::unfold_stream,
244        ))
245    }
246
247    /// Unfold a single item from the stream.
248    async fn unfold_stream<C: StripeClient + Clone>(
249        state: Option<(Self, C)>,
250    ) -> Option<(Result<T::Data, C::Err>, Option<(Self, C)>)> {
251        let (paginator, client) = state?; // If none, our last request was an error, so stop here
252        let mut parts = paginator.page.into_parts();
253        if let Some(next_val) = parts.data.pop() {
254            // We have more data on this page
255            return Some((
256                Ok(next_val),
257                Some((Self { page: T::from_parts(parts), params: paginator.params }, client)),
258            ));
259        }
260
261        // Final value of the stream, no errors
262        if !parts.has_more {
263            return None;
264        }
265
266        let req = req_builder(&parts.url).query(&paginator.params);
267        match req.customize::<T>().send(&client).await {
268            Ok(mut next_page) => {
269                let mut params = paginator.params;
270                next_page.update_params(&mut params);
271
272                let mut parts = next_page.into_parts();
273
274                // We are going to be popping items off the end of the list, so we need to reverse it.
275                parts.data.reverse();
276
277                let next_val = parts.data.pop()?;
278
279                // Yield last value of this page, the next page (and client) becomes the state
280                Some((Ok(next_val), Some((Self { page: T::from_parts(parts), params }, client))))
281            }
282            Err(err) => Some((Err(err), None)), // We ran into an error. The last value of the stream will be the error.
283        }
284    }
285}