stripe_client_core/
pagination.rs

1use std::collections::VecDeque;
2
3use miniserde::Deserialize;
4use serde::Serialize;
5use serde_json::Value;
6use stripe_types::{AsCursorOpt, List, Object, SearchList};
7
8use crate::{RequestBuilder, StripeBlockingClient, StripeClient, StripeMethod};
9
10/// A trait allowing `List<T>` and `SearchList<T>` to be treated the same. Not part of the
11/// public API.
12///
13/// NB: this trait is designed specifically for `List` and `SearchList` and may not be sensible
14/// in other cases. One gotcha is that `into_parts` and `from_parts` do not necessarily
15/// round-trip, e.g. `SearchList<T>` will lose the `next_page` field since that
16/// is not part of the shared list impl. We account for this by ensuring to call `update_params`
17/// before breaking the `SearchList` into pieces.
18#[doc(hidden)]
19pub trait PaginableList: Deserialize {
20    /// Underlying single element type, e.g. `Account`
21    type Data;
22
23    /// Break into the shared parts list pagination requires
24    fn into_parts(self) -> ListParts<Self::Data>;
25
26    /// Reconstruct from the shared parts list pagination requires
27    fn from_parts(parts: ListParts<Self::Data>) -> Self;
28
29    /// Update the current parameter set, with `self` as the most
30    /// recently fetched page.
31    ///
32    /// NB: this should also set `has_more` to `false` explicitly if we don't have a new cursor.
33    /// (This seems redundant with `has_more` but is used to provide extra protection
34    /// against any possibility where `has_more` is `true`, but the cursor is back to `None`,
35    /// potentially leading to an infinite pagination loop).
36    fn update_params(&mut self, params: &mut Value);
37}
38
39/// Specific list parts relied on by the client for pagination.
40#[doc(hidden)]
41#[derive(Debug)]
42pub struct ListParts<T> {
43    pub total_count: Option<u64>,
44    pub url: String,
45    pub data: VecDeque<T>,
46    pub has_more: bool,
47}
48
49impl<T> PaginableList for List<T>
50where
51    T: Object,
52    List<T>: Deserialize,
53{
54    type Data = T;
55
56    fn into_parts(self) -> ListParts<Self::Data> {
57        ListParts {
58            // total_count is not present in `List`, but still present in `ListParts` because the stripe API
59            // claims search pagination can still include `total_count` if requested
60            total_count: None,
61            url: self.url,
62            data: VecDeque::from(self.data),
63            has_more: self.has_more,
64        }
65    }
66
67    fn from_parts(parts: ListParts<Self::Data>) -> Self {
68        Self { data: Vec::from(parts.data), has_more: parts.has_more, url: parts.url }
69    }
70
71    fn update_params(&mut self, params: &mut Value) {
72        if let Some(new_cursor) = self.data.last().and_then(|l| l.id().as_cursor_opt()) {
73            params["starting_after"] = Value::String(new_cursor.into());
74        } else {
75            self.has_more = false;
76        }
77    }
78}
79
80impl<T> PaginableList for SearchList<T>
81where
82    SearchList<T>: Deserialize,
83{
84    type Data = T;
85
86    /// NB: here we lose `next_page`, so we should be sure to `update_params`
87    /// before calling this.
88    fn into_parts(self) -> ListParts<Self::Data> {
89        ListParts {
90            total_count: self.total_count,
91            url: self.url,
92            data: VecDeque::from(self.data),
93            has_more: self.has_more,
94        }
95    }
96
97    fn from_parts(parts: ListParts<Self::Data>) -> Self {
98        Self {
99            url: parts.url,
100            has_more: parts.has_more,
101            data: Vec::from(parts.data),
102            next_page: None,
103            total_count: parts.total_count,
104        }
105    }
106
107    fn update_params(&mut self, params: &mut Value) {
108        if let Some(next_page) = self.next_page.take() {
109            params["page"] = Value::String(next_page);
110        } else {
111            self.has_more = false;
112        }
113    }
114}
115
116/// An extension trait to allow converting `List<T>` and `SearchList<T>` into
117/// a type that can be paginated. Not meant to be implemented by any other types.
118pub trait PaginationExt {
119    /// The underlying pagination type, e.g. `List<T>` or `SearchList<T>`.
120    type Data;
121
122    /// Use the current page state to construct an adaptor capable of paginating
123    /// from where the current data left off.
124    fn into_paginator(self) -> ListPaginator<Self::Data>;
125}
126
127impl<T> PaginationExt for List<T>
128where
129    T: Sync + Send + 'static,
130    List<T>: PaginableList,
131{
132    type Data = List<T>;
133
134    fn into_paginator(mut self) -> ListPaginator<List<T>> {
135        let mut params = Default::default();
136        self.update_params(&mut params);
137        ListPaginator { page: self, params }
138    }
139}
140
141impl<T> PaginationExt for SearchList<T>
142where
143    T: Sync + Send + 'static,
144    SearchList<T>: PaginableList,
145{
146    type Data = SearchList<T>;
147
148    fn into_paginator(mut self) -> ListPaginator<SearchList<T>> {
149        let mut params = Default::default();
150        self.update_params(&mut params);
151        ListPaginator { page: self, params }
152    }
153}
154
155/// Stream designed to support pagination.
156#[derive(Debug)]
157pub struct ListPaginator<T> {
158    page: T,
159    params: Value,
160}
161
162impl<T> ListPaginator<SearchList<T>> {
163    /// Kept public so that the generated code crates can access this trait. Used by `Search*` params
164    /// to implement `PaginationExt`. Not part of the public API.
165    #[doc(hidden)]
166    pub fn new_search_list(url: impl Into<String>, params: impl Serialize) -> Self {
167        let page = SearchList {
168            url: url.into(),
169            has_more: true,
170            data: vec![],
171            next_page: None,
172            total_count: None,
173        };
174        Self {
175            page,
176            params: serde_json::to_value(params)
177                // Expect should be safe since we control which types call this
178                .expect("all Stripe types implement `Serialize` infallibly"),
179        }
180    }
181}
182
183impl<T> ListPaginator<List<T>> {
184    /// Kept public so that the generated code crates can access this trait. Used by `List*` params
185    /// to implement `PaginationExt`. Not part of the public API.
186    #[doc(hidden)]
187    pub fn new_list(url: impl Into<String>, params: impl Serialize) -> Self {
188        let page = List { data: vec![], has_more: true, url: url.into() };
189        Self {
190            page,
191            params: serde_json::to_value(params)
192                .expect("all Stripe types implement `Serialize` infallibly"),
193        }
194    }
195}
196
197fn req_builder(url: &str) -> RequestBuilder {
198    RequestBuilder::new(StripeMethod::Get, url.trim_start_matches("/v1"))
199}
200
201impl<T> ListPaginator<T>
202where
203    T: Sync + Send + 'static + PaginableList,
204{
205    /// Repeatedly queries Stripe for more data until all elements in list are fetched, using
206    /// Stripe's default page size.
207    ///
208    /// # Errors
209    /// If any pagination request returns an error.
210    pub fn get_all<C: StripeBlockingClient>(self, client: &C) -> Result<Vec<T::Data>, C::Err> {
211        let mut data = VecDeque::new();
212        let mut parts = self.page.into_parts();
213        let mut params = self.params;
214        loop {
215            // `append` empties `parts.data`
216            data.append(&mut parts.data);
217
218            if !parts.has_more {
219                break;
220            }
221
222            let req = req_builder(&parts.url).query(&params);
223            let mut next_page: T = req.customize().send_blocking(client)?;
224            next_page.update_params(&mut params);
225            parts = next_page.into_parts();
226        }
227        Ok(Vec::from(data))
228    }
229
230    /// Get all values in this List, consuming self and lazily paginating until all values are fetched.
231    ///
232    /// This function repeatedly queries Stripe for more data until all elements in list are fetched, using
233    /// the page size specified in params, or Stripe's default page size if none is specified.
234    pub fn stream<C: StripeClient + Clone>(
235        self,
236        client: &C,
237    ) -> impl futures_util::Stream<Item = Result<T::Data, C::Err>> + Unpin {
238        // Using VecDeque, we can pop from the front without needing to reverse
239        Box::pin(futures_util::stream::unfold(Some((self, client.clone())), Self::unfold_stream))
240    }
241
242    /// Unfold a single item from the stream.
243    async fn unfold_stream<C: StripeClient + Clone>(
244        state: Option<(Self, C)>,
245    ) -> Option<(Result<T::Data, C::Err>, Option<(Self, C)>)> {
246        let (paginator, client) = state?; // If none, our last request was an error, so stop here
247        let mut parts = paginator.page.into_parts();
248        if let Some(next_val) = parts.data.pop_front() {
249            // We have more data on this page
250            return Some((
251                Ok(next_val),
252                Some((Self { page: T::from_parts(parts), params: paginator.params }, client)),
253            ));
254        }
255
256        // Final value of the stream, no errors
257        if !parts.has_more {
258            return None;
259        }
260
261        let req = req_builder(&parts.url).query(&paginator.params);
262        match req.customize::<T>().send(&client).await {
263            Ok(mut next_page) => {
264                let mut params = paginator.params;
265                next_page.update_params(&mut params);
266
267                let mut parts = next_page.into_parts();
268
269                let next_val = parts.data.pop_front()?;
270
271                // Yield last value of this page, the next page (and client) becomes the state
272                Some((Ok(next_val), Some((Self { page: T::from_parts(parts), params }, client))))
273            }
274            Err(err) => Some((Err(err), None)), // We ran into an error. The last value of the stream will be the error.
275        }
276    }
277}