chewdata/connector/paginator/curl/cursor.rs
1//! Retrieve the token for the next call from the response. The paginator cannot be parallelized.
2//!
3//! ### Configuration
4//!
5//! | key | alias | Description | Default Value | Possible Values |
6//! | ---------- | ----- | ---------------------------------------------------------------------------------- | ------------- | --------------- |
7//! | type | - | Required in order to use this paginator. | `cursor` | `cursor` |
8//! | limit | - | Limit of records to retrieve for each request. | `100` | Unsigned number |
9//! | entry_path | - | The entry path for capturing the token in the response's body. | `/next` | String |
10//! | next | - | Force to start the pagination in a specifique position. | `null` | String |
11//!
12//! ### Example
13//!
14//! ```json
15//! [
16//! {
17//! "type": "read",
18//! "connector":{
19//! "type": "curl",
20//! "endpoint": "{{ CURL_ENDPOINT }}",
21//! "path": "/get?next={{ paginator.next }}",
22//! "method": "get",
23//! "paginator": {
24//! "type": "cursor",
25//! "limit": 10,
26//! "entry_path": "/next",
27//! "next": "e5f705e2-5ed8-11ed-9b6a-0242ac120002"
28//! }
29//! }
30//! }
31//! ]
32//! ```
33//!
34//! Response body:
35//!
36//! ```json
37//! {
38//! "data": [
39//! ...
40//! ],
41//! "previous": "22d05674-5ed6-11ed-9b6a-0242ac120002",
42//! "next": "274b5dac-5ed6-11ed-9b6a-0242ac120002"
43//! }
44//! ```
45use crate::connector::Connector;
46use crate::{connector::curl::Curl, ConnectorStream};
47use smol::stream::StreamExt;
48use async_stream::stream;
49use json_value_merge::Merge;
50use serde::{Deserialize, Serialize};
51use serde_json::Value;
52use std::io::Result;
53
54#[derive(Debug, Deserialize, Serialize, Clone)]
55#[serde(default, deny_unknown_fields)]
56pub struct Cursor {
57 pub limit: usize,
58 pub entry_path: String,
59 #[serde(rename = "next")]
60 pub next_token: Option<String>,
61}
62
63impl Default for Cursor {
64 fn default() -> Self {
65 Cursor {
66 limit: 100,
67 next_token: None,
68 entry_path: "/next".to_string(),
69 }
70 }
71}
72
73impl Cursor {
74 /// Cursor paginate.
75 ///
76 /// # Examples
77 ///
78 /// ```no_run
79 /// use chewdata::connector::{curl::Curl, Connector};
80 /// use chewdata::connector::paginator::curl::{PaginatorType, cursor::Cursor};
81 /// use smol::prelude::*;
82 /// use std::io;
83 ///
84 /// use macro_rules_attribute::apply;
85 /// use smol_macros::main;
86 ///
87 /// #[apply(main!)]
88 /// async fn main() -> io::Result<()> {
89 /// let mut connector = Curl::default();
90 /// connector.endpoint = "http://localhost:8080".to_string();
91 /// connector.method = "GET".into();
92 /// connector.path = "/uuid?next={{ paginator.next }}".to_string();
93 ///
94 /// let paginator = Cursor {
95 /// limit: 1,
96 /// entry_path: "/uuid".to_string(),
97 /// ..Default::default()
98 /// };
99 ///
100 /// let mut paging = paginator.paginate(&connector).await?;
101 ///
102 /// assert!(paging.next().await.transpose()?.is_some());
103 /// assert!(paging.next().await.transpose()?.is_some());
104 ///
105 /// Ok(())
106 /// }
107 /// ```
108 #[instrument(name = "cursor::paginate")]
109 pub async fn paginate(&self, connector: &Curl) -> Result<ConnectorStream> {
110 let connector = connector.clone();
111 let limit = self.limit;
112 let entry_path = self.entry_path.clone();
113 let mut next_token_opt = self.next_token.clone();
114
115 let mut document = connector.document()?.clone();
116 document.set_entry_path(entry_path.clone());
117
118 let stream = Box::pin(stream! {
119 let mut has_next = true;
120
121 while has_next {
122 let mut new_connector = connector.clone();
123 new_connector.set_document(document.clone())?;
124
125 let mut new_parameters = connector.parameters.clone();
126
127 if let Some(next_token) = next_token_opt {
128 new_parameters.merge_in("/paginator/next", &Value::String(next_token))?;
129 } else {
130 new_parameters.merge_in("/paginator/next", &Value::String("".to_string()))?;
131 }
132
133 new_parameters
134 .merge_in("/paginator/limit", &Value::String(limit.to_string()))?;
135 new_connector.set_parameters(new_parameters);
136
137 let mut dataset = match new_connector.fetch().await? {
138 Some(dataset) => dataset,
139 None => break
140 };
141
142 let data_opt = dataset.next().await;
143
144 let value = match data_opt {
145 Some(data) => data.to_value(),
146 None => Value::Null,
147 };
148
149 next_token_opt = match value {
150 Value::Number(_) => Some(value.to_string()),
151 Value::String(string) => Some(string),
152 _ => None,
153 };
154
155 if next_token_opt.is_none() {
156 has_next = false;
157 }
158
159 trace!(connector = format!("{:?}", new_connector).as_str(), "Yield a new connector");
160 yield Ok(Box::new(new_connector) as Box<dyn Connector>);
161 }
162 trace!("Stop yielding new connector");
163 });
164
165 Ok(stream)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use crate::connector::curl::Curl;
172 use crate::connector::paginator::curl::cursor::Cursor;
173 use crate::connector::Connector;
174 use crate::document::json::Json;
175 use smol::stream::StreamExt;
176 use macro_rules_attribute::apply;
177 use smol_macros::test;
178
179 #[apply(test!)]
180 async fn paginate() {
181 let document = Json::default();
182 let mut connector = Curl::default();
183 connector.endpoint = "http://localhost:8080".to_string();
184 connector.method = "GET".into();
185 connector.path = "/uuid?next={{ paginator.next }}".to_string();
186 connector.set_document(Box::new(document)).unwrap();
187
188 let paginator = Cursor {
189 limit: 1,
190 entry_path: "/uuid".to_string(),
191 ..Default::default()
192 };
193
194 let mut paging = paginator.paginate(&connector).await.unwrap();
195
196 let connector = paging.next().await.transpose().unwrap();
197 assert!(connector.is_some());
198 let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
199 let data_1 = datastream.next().await.unwrap();
200
201 let connector = paging.next().await.transpose().unwrap();
202 assert!(connector.is_some());
203 let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
204 let data_2 = datastream.next().await.unwrap();
205
206 assert!(
207 data_1 != data_2,
208 "The content of this two stream are not different."
209 );
210 }
211}