payjp_client_core/
pagination.rs

1use miniserde::Deserialize;
2use serde::Serialize;
3use serde_json::Value;
4use payjp_types::{AsCursorOpt, List, Object, };
5
6use crate::{RequestBuilder, BlockingClient, PayjpClient, PayjpMethod};
7
8/// A trait allowing `List<T>` to be treated the same. Not part of the
9/// public API.
10///
11/// NB: this trait is designed specifically for `List` and may not be sensible
12/// in other cases. One gotcha is that `into_parts` and `from_parts` do not necessarily
13/// round-trip, e.g. `List<T>` will lose the `next_page` field since that
14/// is not part of the shared list impl. We account for this by ensuring to call `update_params`
15/// before breaking the `List` into pieces.
16#[doc(hidden)]
17pub trait PaginableList: Deserialize {
18    /// Underlying single element type, e.g. `Account`
19    type Data;
20
21    /// Break into the shared parts list pagination requires
22    fn into_parts(self) -> ListParts<Self::Data>;
23
24    /// Reconstruct from the shared parts list pagination requires
25    fn from_parts(parts: ListParts<Self::Data>) -> Self;
26
27    /// Update the current parameter set, with `self` as the most
28    /// recently fetched page.
29    ///
30    /// NB: this should also set `has_more` to `false` explicitly if we don't have a new cursor.
31    /// (This seems redundant with `has_more` but is used to provide extra protection
32    /// against any possibility where `has_more` is `true`, but the cursor is back to `None`,
33    /// potentially leading to an infinite pagination loop).
34    fn update_params(&mut self, params: &mut Value);
35}
36
37/// Specific list parts relied on by the client for pagination.
38#[doc(hidden)]
39#[derive(Debug)]
40pub struct ListParts<T> {
41    pub count: Option<u64>,
42    pub url: String,
43    pub data: Vec<T>,
44    pub has_more: bool,
45}
46
47impl<T> PaginableList for List<T>
48where
49    T: Object,
50    List<T>: Deserialize,
51{
52    type Data = T;
53
54    fn into_parts(self) -> ListParts<Self::Data> {
55        ListParts {
56            count: self.count,
57            url: self.url,
58            data: self.data,
59            has_more: self.has_more,
60        }
61    }
62
63    fn from_parts(parts: ListParts<Self::Data>) -> Self {
64        Self {
65            data: parts.data,
66            has_more: parts.has_more,
67            count: parts.count,
68            url: parts.url,
69        }
70    }
71
72    fn update_params(&mut self, params: &mut Value) {
73        if let Some(new_cursor) = self.data.last().and_then(|l| l.id().as_cursor_opt()) {
74            params["starting_after"] = Value::String(new_cursor.into());
75        } else {
76            self.has_more = false;
77        }
78    }
79}
80
81/// An extension trait to allow converting `List<T>` into
82/// a type that can be paginated. Not meant to be implemented by any other types.
83pub trait PaginationExt {
84    /// The underlying pagination type, e.g. `List<T>`
85    type Data;
86
87    /// Use the current page state to construct an adaptor capable of paginating
88    /// from where the current data left off.
89    fn into_paginator(self) -> ListPaginator<Self::Data>;
90}
91
92impl<T> PaginationExt for List<T>
93where
94    T: Sync + Send + 'static,
95    List<T>: PaginableList,
96{
97    type Data = List<T>;
98
99    fn into_paginator(mut self) -> ListPaginator<List<T>> {
100        let mut params = Default::default();
101        self.update_params(&mut params);
102        ListPaginator { page: self, params }
103    }
104}
105
106/// Stream designed to support pagination.
107#[derive(Debug)]
108pub struct ListPaginator<T> {
109    page: T,
110    params: Value,
111}
112
113impl<T> ListPaginator<List<T>> {
114    /// Kept public so that the generated code crates can access this trait. Used by `List*` params
115    /// to implement `PaginationExt`. Not part of the public API.
116    #[doc(hidden)]
117    pub fn new_list(url: impl Into<String>, params: impl Serialize) -> Self {
118        let page = List { data: vec![], has_more: true, count: None, url: url.into() };
119        Self {
120            page,
121            params: serde_json::to_value(params)
122                .expect("all types implement `Serialize` infallibly"),
123        }
124    }
125}
126
127fn req_builder(url: &str) -> RequestBuilder {
128    RequestBuilder::new(PayjpMethod::Get, url.trim_start_matches("/v1"))
129}
130
131impl<T> ListPaginator<T>
132where
133    T: Sync + Send + 'static + PaginableList,
134{
135    /// Repeatedly queries for more data until all elements in list are fetched, using
136    ///
137    /// # Errors
138    /// If any pagination request returns an error.
139    pub fn get_all<C: BlockingClient>(self, client: &C) -> Result<Vec<T::Data>, C::Err> {
140        let mut data = vec![];
141        let mut parts = self.page.into_parts();
142        let mut params = self.params;
143        loop {
144            // `append` empties `parts.data`
145            data.append(&mut parts.data);
146
147            if !parts.has_more {
148                break;
149            }
150
151            let req = req_builder(&parts.url).query(&params);
152            let mut next_page: T = req.customize().send_blocking(client)?;
153            next_page.update_params(&mut params);
154            parts = next_page.into_parts();
155        }
156        Ok(data)
157    }
158
159    /// Get all values in this List, consuming self and lazily paginating until all values are fetched.
160    ///
161    /// This function repeatedly queries for more data until all elements in list are fetched.
162    pub fn stream<C: PayjpClient + Clone>(
163        self,
164        client: &C,
165    ) -> impl futures_util::Stream<Item = Result<T::Data, C::Err>> + Unpin {
166        // We are going to be popping items off the end of the list, so we need to reverse it.
167        let mut page = self.page.into_parts();
168        page.data.reverse();
169        let paginator = ListPaginator { page: T::from_parts(page), params: self.params };
170
171        Box::pin(futures_util::stream::unfold(
172            Some((paginator, client.clone())),
173            Self::unfold_stream,
174        ))
175    }
176
177    /// Unfold a single item from the stream.
178    async fn unfold_stream<C: PayjpClient + Clone>(
179        state: Option<(Self, C)>,
180    ) -> Option<(Result<T::Data, C::Err>, Option<(Self, C)>)> {
181        let (paginator, client) = state?; // If none, our last request was an error, so stop here
182        let mut parts = paginator.page.into_parts();
183        if let Some(next_val) = parts.data.pop() {
184            // We have more data on this page
185            return Some((
186                Ok(next_val),
187                Some((Self { page: T::from_parts(parts), params: paginator.params }, client)),
188            ));
189        }
190
191        // Final value of the stream, no errors
192        if !parts.has_more {
193            return None;
194        }
195
196        let req = req_builder(&parts.url).query(&paginator.params);
197        match req.customize::<T>().send(&client).await {
198            Ok(mut next_page) => {
199                let mut params = paginator.params;
200                next_page.update_params(&mut params);
201
202                let mut parts = next_page.into_parts();
203
204                // We are going to be popping items off the end of the list, so we need to reverse it.
205                parts.data.reverse();
206
207                let next_val = parts.data.pop()?;
208
209                // Yield last value of this page, the next page (and client) becomes the state
210                Some((Ok(next_val), Some((Self { page: T::from_parts(parts), params }, client))))
211            }
212            Err(err) => Some((Err(err), None)), // We ran into an error. The last value of the stream will be the error.
213        }
214    }
215}