use serde::Serialize;
use bson::Document;
use std::borrow::Borrow;
use std::sync::Weak;
use serde::de::DeserializeOwned;
use crate::options::UpdateOptions;
use crate::{Error, IndexModel, Result};
use crate::db::db_inner::DatabaseInner;
use crate::action::{Aggregate, Find};
use crate::results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult};
macro_rules! try_multiple {
($err: expr, $action: expr) => {
match $action {
Ok(ret) => ret,
Err(expr_err) => {
return Err($err.add(expr_err))
},
}
}
}
macro_rules! try_db_op {
($txn: expr, $action: expr) => {
match $action {
Ok(ret) => {
$txn.commit()?;
ret
}
Err(err) => {
try_multiple!(err, $txn.rollback());
return Err(err);
}
}
}
}
pub trait CollectionT<T> {
fn name(&self) -> &str;
fn count_documents(&self) -> Result<u64>;
fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult>;
fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult>;
fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
fn delete_one(&self, query: Document) -> Result<DeleteResult>;
fn delete_many(&self, query: Document) -> Result<DeleteResult>;
fn create_index(&self, index: IndexModel) -> Result<()>;
fn drop_index(&self, name: impl AsRef<str>) -> Result<()>;
fn drop(&self) -> Result<()>;
fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
where T: Serialize;
fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
where T: Serialize;
fn find(&self, filter: Document) -> Find<'_, '_, T>
where T: DeserializeOwned + Send + Sync;
fn find_one(&self, filter: Document) -> Result<Option<T>>
where T: DeserializeOwned + Send + Sync;
fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_>;
}
pub struct Collection<T> {
db: Weak<DatabaseInner>,
name: String,
_phantom: std::marker::PhantomData<T>,
}
impl<T> Collection<T>
{
pub(crate) fn new(db: Weak<DatabaseInner>, name: &str) -> Collection<T> {
Collection {
db,
name: name.into(),
_phantom: std::default::Default::default(),
}
}
}
impl<T> CollectionT<T> for Collection<T> {
fn name(&self) -> &str {
&self.name
}
fn count_documents(&self) -> Result<u64> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let count = db.count_documents(&self.name, &txn)?;
Ok(count)
}
fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.update_one(
&self.name,
query,
update,
UpdateOptions::default(),
&txn,
));
Ok(result)
}
fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.update_one(
&self.name,
query,
update,
options,
&txn,
));
Ok(result)
}
fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.update_many(
&self.name,
query,
update,
UpdateOptions::default(),
&txn,
));
Ok(result)
}
fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.update_many(
&self.name,
query,
update,
options,
&txn,
));
Ok(result)
}
fn delete_one(&self, query: Document) -> Result<DeleteResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.delete_one(&self.name, query, &txn));
Ok(result)
}
fn delete_many(&self, query: Document) -> Result<DeleteResult> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.delete_many(&self.name, query, &txn));
Ok(result)
}
fn create_index(&self, index: IndexModel) -> Result<()> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
try_db_op!(txn, db.create_index(&self.name, index, &txn));
Ok(())
}
fn drop_index(&self, name: impl AsRef<str>) -> Result<()> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
try_db_op!(txn, db.drop_index(&self.name, name.as_ref(), &txn));
Ok(())
}
fn drop(&self) -> Result<()> {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
try_db_op!(txn, db.drop_collection(&self.name, &txn));
Ok(())
}
fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
where T: Serialize {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.insert_one(
&self.name,
bson::to_document(doc.borrow())?,
&txn,
));
Ok(result)
}
fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
where T: Serialize {
let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
let txn = db.start_transaction()?;
let result = try_db_op!(txn, db.insert_many(&self.name, docs, &txn));
Ok(result)
}
fn find(&self, filter: Document) -> Find<T>
where T: DeserializeOwned + Send + Sync {
Find::new(self.db.clone(), &self.name, None, filter)
}
fn find_one(&self, filter: Document) -> Result<Option<T>>
where T: DeserializeOwned + Send + Sync {
let mut cursor = self.find(filter).run()?;
let test = cursor.advance()?;
if !test {
return Ok(None);
}
Ok(Some(cursor.deserialize_current()?))
}
fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_> {
Aggregate::new(
self.db.clone(),
&self.name,
pipeline.into_iter().collect(),
None,
)
}
}