use crate::{
connector::{mongodb::Mongodb, Connector},
ConnectorStream,
};
use async_compat::{Compat, CompatExt};
use async_stream::stream;
use mongodb::bson::{doc, Document};
use serde::{Deserialize, Serialize};
use smol::stream::StreamExt;
use std::io::{Error, ErrorKind, Result};
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Cursor {
pub limit: usize,
pub skip: usize,
}
impl Default for Cursor {
fn default() -> Self {
Cursor {
limit: 100,
skip: 0,
}
}
}
impl Cursor {
#[instrument(name = "cursor::paginate")]
pub async fn paginate(&self, connector: &Mongodb) -> Result<ConnectorStream> {
let connector = connector.clone();
let database = connector.database.clone();
let collection = connector.collection.clone();
let parameters = connector.parameters.clone();
let skip = self.skip;
let batch_size = self.limit;
let mut options = (*connector.find_options.clone()).unwrap_or_default();
options.skip = Some(skip as u64);
let filter: Document = match connector.filter(¶meters) {
Some(filter) => serde_json::from_str(filter.to_string().as_str())?,
None => Document::new(),
};
let client = connector
.client()
.compat()
.await
.map_err(|e| Error::new(ErrorKind::Interrupted, e))?;
let db = client.database(&database);
let collection = db.collection::<Document>(&collection);
let cursor = Compat::new(async {
collection
.find(filter)
.with_options(Some(options))
.await
.map_err(|e| Error::new(ErrorKind::Interrupted, e))
})
.await?;
let cursor_size = cursor.count().await;
Ok(Box::pin(stream! {
for i in 0..cursor_size {
if 0 == i%batch_size || i == cursor_size {
let mut new_connector = connector.clone();
let mut options = (*new_connector.find_options.clone()).unwrap_or_default();
options.skip = Some(i as u64);
options.limit = Some(batch_size as i64);
new_connector.find_options = Box::new(Some(options.clone()));
trace!(connector = format!("{:?}", new_connector).as_str(), "Yield a new connector");
yield Ok(Box::new(new_connector) as Box<dyn Connector>);
}
}
trace!("Stop yielding new connector");
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
#[apply(test!)]
async fn paginate() {
let mut connector = Mongodb::default();
connector.endpoint = "mongodb://admin:admin@localhost:27017".into();
connector.database = "local".into();
connector.collection = "startup_log".into();
let paginator = Cursor {
skip: 0,
limit: 1,
..Default::default()
};
let mut paging = paginator.paginate(&connector).await.unwrap();
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_some());
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_some());
}
#[apply(test!)]
async fn paginate_to_end() {
let mut connector = Mongodb::default();
connector.endpoint = "mongodb://admin:admin@localhost:27017".into();
connector.database = "local".into();
connector.collection = "startup_log".into();
let paginator = Cursor {
skip: 0,
..Default::default()
};
let mut paging = paginator.paginate(&connector).await.unwrap();
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_some());
let connector = paging.next().await.transpose().unwrap();
assert!(connector.is_none());
}
}