use super::{
MongodbConfigStore,
serde::{transform_from_mongo, transform_to_mongo},
};
use crate::storage::Error;
use crate::storage::config_store::config_store::FindOptions;
use futures::TryStreamExt;
use mongodb::{ClientSession, Collection, bson::Document};
use pbbson::Model;
use std::fmt::Debug;
use std::str::FromStr;
pub async fn find_many<B: Clone + Debug + FromStr + Send + Sync + ToString>(
this: &MongodbConfigStore<B>,
mut session: Option<&mut ClientSession>,
bucket: B,
options: FindOptions,
) -> Result<Vec<Model>, Error> {
let coll: Collection<Document> = this.db.collection(&bucket.to_string());
let filter = transform_to_mongo(options.filter)?;
let mut find_future = coll.find(filter.into());
if let (Some(offset), _) = options.pagination {
find_future = find_future.skip(offset.into());
}
if let (_, Some(limit)) = options.pagination {
find_future = find_future.limit(limit.into());
}
if let Some(ordering) = options.ordering {
let ordering = transform_to_mongo(ordering)?;
find_future = find_future.sort(ordering.into());
}
if let Some(projection) = options.projection {
let mut projection = projection.clone();
if let Some(id) = projection.get("id") {
projection.insert("_id".to_string(), id.clone());
projection.remove("id");
}
find_future = find_future.projection(projection.into());
}
let documents: Vec<_> = {
if let Some(ref mut session) = session {
let find_future = find_future.session(&mut **session);
let mut cursor = find_future.await?;
cursor.stream(session).try_collect().await?
} else {
let cursor = find_future.await?;
cursor.try_collect().await?
}
};
let mut models = vec![];
for document in documents {
models.push(transform_from_mongo(document.into())?);
}
Ok(models)
}