use std::sync::Weak;
use bson::Document;
use serde::de::DeserializeOwned;
use crate::{ClientCursor, Error, Result};
use crate::db::db_inner::DatabaseInner;
use crate::transaction::TransactionInner;
pub struct Aggregate<'a, 'b, T: DeserializeOwned + Send + Sync = Document> {
db: Weak<DatabaseInner>,
name: &'a str,
pipeline: Vec<Document>,
txn: Option<&'b TransactionInner>,
_phantom: std::marker::PhantomData<T>,
}
impl <'a, 'b , T: DeserializeOwned + Send + Sync> Aggregate<'a, 'b, T> {
pub(crate) fn new(db: Weak<DatabaseInner>, name: &'a str, pipeline: Vec<Document>, txn: Option<&'b TransactionInner>) -> Aggregate<'a, 'b, T> {
Aggregate {
db,
name,
pipeline,
txn,
_phantom: Default::default(),
}
}
pub fn run(self) -> Result<ClientCursor<T>> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = match self.txn {
Some(txn) => txn.clone(),
None => {
db.start_transaction()?
}
};
db.aggregate_with_owned_session(self.name, self.pipeline, txn.clone())
}
pub fn with_type<U>(self) -> Aggregate<'a, 'b, U>
where U: DeserializeOwned + Send + Sync {
Aggregate {
db: self.db,
name: self.name,
pipeline: self.pipeline,
txn: self.txn,
_phantom: Default::default(),
}
}
}