chewdata/connector/paginator/curl/
cursor.rs

1//! Retrieve the token for the next call from the response. The paginator cannot be parallelized.
2//!
3//! ### Configuration
4//!
5//! | key        | alias | Description                                                                        | Default Value | Possible Values |
6//! | ---------- | ----- | ---------------------------------------------------------------------------------- | ------------- | --------------- |
7//! | type       | -     | Required in order to use this paginator.                                            | `cursor`      | `cursor`        |
8//! | limit      | -     | Limit of records to retrieve for each request.                                         | `100`         | Unsigned number |
9//! | entry_path | -     | The entry path for capturing the token in the response's body. | `/next`       | String          |
10//! | next       | -     | Force to start the pagination in a specifique position.                             | `null`        | String          |
11//!
12//! ### Example
13//!
14//! ```json
15//! [
16//!     {
17//!         "type": "read",
18//!         "connector":{
19//!             "type": "curl",
20//!             "endpoint": "{{ CURL_ENDPOINT }}",
21//!             "path": "/get?next={{ paginator.next }}",
22//!             "method": "get",
23//!             "paginator": {
24//!                 "type": "cursor",
25//!                 "limit": 10,
26//!                 "entry_path": "/next",
27//!                 "next": "e5f705e2-5ed8-11ed-9b6a-0242ac120002"
28//!             }
29//!         }
30//!     }
31//! ]
32//! ```
33//!
34//! Response body:
35//!
36//! ```json
37//! {
38//!     "data": [
39//!         ...
40//!     ],
41//!     "previous": "22d05674-5ed6-11ed-9b6a-0242ac120002",
42//!     "next": "274b5dac-5ed6-11ed-9b6a-0242ac120002"
43//! }
44//! ```
45use crate::connector::Connector;
46use crate::{connector::curl::Curl, ConnectorStream};
47use smol::stream::StreamExt;
48use async_stream::stream;
49use json_value_merge::Merge;
50use serde::{Deserialize, Serialize};
51use serde_json::Value;
52use std::io::Result;
53
54#[derive(Debug, Deserialize, Serialize, Clone)]
55#[serde(default, deny_unknown_fields)]
56pub struct Cursor {
57    pub limit: usize,
58    pub entry_path: String,
59    #[serde(rename = "next")]
60    pub next_token: Option<String>,
61}
62
63impl Default for Cursor {
64    fn default() -> Self {
65        Cursor {
66            limit: 100,
67            next_token: None,
68            entry_path: "/next".to_string(),
69        }
70    }
71}
72
73impl Cursor {
74    /// Cursor paginate.
75    ///
76    /// # Examples
77    ///
78    /// ```no_run
79    /// use chewdata::connector::{curl::Curl, Connector};
80    /// use chewdata::connector::paginator::curl::{PaginatorType, cursor::Cursor};
81    /// use smol::prelude::*;
82    /// use std::io;
83    ///
84    /// use macro_rules_attribute::apply;
85    /// use smol_macros::main;
86    /// 
87    /// #[apply(main!)]
88    /// async fn main() -> io::Result<()> {
89    ///     let mut connector = Curl::default();
90    ///     connector.endpoint = "http://localhost:8080".to_string();
91    ///     connector.method = "GET".into();
92    ///     connector.path = "/uuid?next={{ paginator.next }}".to_string();
93    ///
94    ///     let paginator = Cursor {
95    ///         limit: 1,
96    ///         entry_path: "/uuid".to_string(),
97    ///         ..Default::default()
98    ///     };
99    ///
100    ///     let mut paging = paginator.paginate(&connector).await?;
101    ///
102    ///     assert!(paging.next().await.transpose()?.is_some());
103    ///     assert!(paging.next().await.transpose()?.is_some());
104    ///
105    ///     Ok(())
106    /// }
107    /// ```
108    #[instrument(name = "cursor::paginate")]
109    pub async fn paginate(&self, connector: &Curl) -> Result<ConnectorStream> {
110        let connector = connector.clone();
111        let limit = self.limit;
112        let entry_path = self.entry_path.clone();
113        let mut next_token_opt = self.next_token.clone();
114
115        let mut document = connector.document()?.clone();
116        document.set_entry_path(entry_path.clone());
117
118        let stream = Box::pin(stream! {
119            let mut has_next = true;
120            
121            while has_next {
122                let mut new_connector = connector.clone();
123                new_connector.set_document(document.clone())?;
124
125                let mut new_parameters = connector.parameters.clone();
126
127                if let Some(next_token) = next_token_opt {
128                    new_parameters.merge_in("/paginator/next", &Value::String(next_token))?;
129                } else {
130                    new_parameters.merge_in("/paginator/next", &Value::String("".to_string()))?;
131                }
132
133                new_parameters
134                    .merge_in("/paginator/limit", &Value::String(limit.to_string()))?;
135                new_connector.set_parameters(new_parameters);
136
137                let mut dataset = match new_connector.fetch().await? {
138                    Some(dataset) => dataset,
139                    None => break
140                };
141
142                let data_opt = dataset.next().await;
143
144                let value = match data_opt {
145                    Some(data) => data.to_value(),
146                    None => Value::Null,
147                };
148
149                next_token_opt = match value {
150                    Value::Number(_) => Some(value.to_string()),
151                    Value::String(string) => Some(string),
152                    _ => None,
153                };
154
155                if next_token_opt.is_none() {
156                    has_next = false;
157                }
158
159                trace!(connector = format!("{:?}", new_connector).as_str(), "Yield a new connector");
160                yield Ok(Box::new(new_connector) as Box<dyn Connector>);
161            }
162            trace!("Stop yielding new connector");
163        });
164
165        Ok(stream)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::connector::curl::Curl;
172    use crate::connector::paginator::curl::cursor::Cursor;
173    use crate::connector::Connector;
174    use crate::document::json::Json;
175    use smol::stream::StreamExt;
176    use macro_rules_attribute::apply;
177    use smol_macros::test;
178
179    #[apply(test!)]
180    async fn paginate() {
181        let document = Json::default();
182        let mut connector = Curl::default();
183        connector.endpoint = "http://localhost:8080".to_string();
184        connector.method = "GET".into();
185        connector.path = "/uuid?next={{ paginator.next }}".to_string();
186        connector.set_document(Box::new(document)).unwrap();
187
188        let paginator = Cursor {
189            limit: 1,
190            entry_path: "/uuid".to_string(),
191            ..Default::default()
192        };
193
194        let mut paging = paginator.paginate(&connector).await.unwrap();
195
196        let connector = paging.next().await.transpose().unwrap();
197        assert!(connector.is_some());
198        let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
199        let data_1 = datastream.next().await.unwrap();
200
201        let connector = paging.next().await.transpose().unwrap();
202        assert!(connector.is_some());
203        let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
204        let data_2 = datastream.next().await.unwrap();
205
206        assert!(
207            data_1 != data_2,
208            "The content of this two stream are not different."
209        );
210    }
211}