use std::sync::Weak;
use bson::{Document, doc};
use serde::de::DeserializeOwned;
use crate::db::db_inner::DatabaseInner;
use crate::{ClientCursor, Error, Result};
use crate::transaction::TransactionInner;
pub struct Find<'a, 'b, T: DeserializeOwned + Send + Sync> {
db: Weak<DatabaseInner>,
name: &'a str,
txn: Option<&'b TransactionInner>,
filter: Document,
skip: Option<u64>,
limit: Option<u64>,
sort: Option<Document>,
_phantom: std::marker::PhantomData<T>,
}
impl <'a, 'b , T: DeserializeOwned + Send + Sync> Find<'a, 'b, T> {
pub(crate) fn new(db: Weak<DatabaseInner>, name: &'a str, txn: Option<&'b TransactionInner>, filter: Document) -> Find<'a, 'b, T> {
Find {
db,
name,
txn,
filter,
skip: None,
limit: None,
sort: None,
_phantom: Default::default(),
}
}
pub fn skip(mut self, skip: u64) -> Self {
self.skip = Some(skip);
self
}
pub fn limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}
pub fn sort(mut self, sort: Document) -> Self {
self.sort = Some(sort);
self
}
pub fn run(self) -> Result<ClientCursor<T>> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = match self.txn {
Some(txn) => txn.clone(),
None => {
db.start_transaction()?
}
};
match (self.skip.as_ref(), self.limit.as_ref(), self.sort.as_ref()) {
(None, None, None) => {
db.find_with_owned_session(self.name, self.filter, txn)
}
_ => {
let mut pipeline = vec![
doc! {
"$match": self.filter
}
];
if let Some(sort) = self.sort {
pipeline.push(doc! {
"$sort": sort
});
}
if let Some(skip) = self.skip {
pipeline.push(doc! {
"$skip": skip as i64,
});
}
if let Some(limit) = self.limit {
pipeline.push(doc! {
"$limit": limit as i64,
});
}
db.aggregate_with_owned_session(self.name, pipeline, txn)
}
}
}
}