use crate::connector::Connector;
use crate::{connector::curl::Curl, ConnectorStream};
use async_stream::stream;
use json_value_merge::Merge;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use smol::stream::StreamExt;
use std::io::Result;
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Cursor {
pub limit: usize,
pub entry_path: String,
#[serde(rename = "next")]
pub next_token: Option<String>,
}
impl Default for Cursor {
fn default() -> Self {
Cursor {
limit: 100,
next_token: None,
entry_path: "/next".to_string(),
}
}
}
impl Cursor {
#[instrument(name = "cursor::paginate")]
pub async fn paginate(&self, connector: &Curl) -> Result<ConnectorStream> {
let connector = connector.clone();
let limit = self.limit;
let entry_path = self.entry_path.clone();
let mut next_token_opt = self.next_token.clone();
let mut document = connector.document()?.clone_box();
document.set_entry_path(entry_path.clone());
let stream = Box::pin(stream! {
let mut has_next = true;
while has_next {
let mut new_connector = connector.clone();
new_connector.set_document(document.clone_box())?;
let mut new_parameters = connector.parameters.clone();
if let Some(next_token) = next_token_opt {
new_parameters.merge_in("/paginator/next", &Value::String(next_token))?;
} else {
new_parameters.merge_in("/paginator/next", &Value::String("".to_string()))?;
}
new_parameters
.merge_in("/paginator/limit", &Value::String(limit.to_string()))?;
new_connector.set_parameters(new_parameters);
let mut dataset = match new_connector.fetch().await? {
Some(dataset) => dataset,
None => break
};
let data_opt = dataset.next().await;
let value = match data_opt {
Some(data) => data.to_value(),
None => Value::Null,
};
next_token_opt = match value {
Value::Number(_) => Some(value.to_string()),
Value::String(string) => Some(string),
_ => None,
};
if next_token_opt.is_none() {
has_next = false;
}
trace!(connector = format!("{:?}", new_connector).as_str(), "Yield a new connector");
yield Ok(Box::new(new_connector) as Box<dyn Connector>);
}
trace!("Stop yielding new connector");
});
Ok(stream)
}
}
#[cfg(test)]
mod tests {
use crate::connector::curl::Curl;
use crate::connector::paginator::curl::cursor::Cursor;
use crate::connector::Connector;
use crate::document::json::Json;
use http::Method;
use macro_rules_attribute::apply;
use smol::stream::StreamExt;
use smol_macros::test;
#[apply(test!)]
async fn paginate() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::GET;
connector.path = "/uuid?next={{ paginator.next }}".to_string();
connector.set_document(Box::new(document)).unwrap();
let paginator = Cursor {
limit: 1,
entry_path: "/uuid".to_string(),
..Default::default()
};
let mut paging = paginator.paginate(&connector).await.unwrap();
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_some());
let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
let data_1 = datastream.next().await.unwrap();
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_some());
let mut datastream = connector.unwrap().fetch().await.unwrap().unwrap();
let data_2 = datastream.next().await.unwrap();
assert!(
data_1 != data_2,
"The content of this two stream are not different."
);
}
}